Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(97)

Side by Side Diff: sdk/lib/async/stream_pipe.dart

Issue 11886013: Make Stream transformation respect the single/multi subscriber nature of the source. (Closed) Base URL: https://dart.googlecode.com/svn/branches/bleeding_edge/dart
Patch Set: Added missing isSingleSubscription impl. Created 7 years, 11 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch | Annotate | Revision Log
OLDNEW
1 // Copyright (c) 2012, the Dart project authors. Please see the AUTHORS file 1 // Copyright (c) 2012, the Dart project authors. Please see the AUTHORS file
2 // for details. All rights reserved. Use of this source code is governed by a 2 // for details. All rights reserved. Use of this source code is governed by a
3 // BSD-style license that can be found in the LICENSE file. 3 // BSD-style license that can be found in the LICENSE file.
4 4
5 part of dart.async; 5 part of dart.async;
6 6
7 /** Utility function to create an [AsyncError] if [error] isn't one already. */ 7 /** Utility function to create an [AsyncError] if [error] isn't one already. */
8 AsyncError _asyncError(Object error, Object stackTrace, [AsyncError cause]) { 8 AsyncError _asyncError(Object error, Object stackTrace, [AsyncError cause]) {
9 if (error is AsyncError) return error; 9 if (error is AsyncError) return error;
10 if (cause == null) return new AsyncError(error, stackTrace); 10 if (cause == null) return new AsyncError(error, stackTrace);
(...skipping 20 matching lines...) Expand all
31 31
32 /** Helper function to make an onError argument to [_runUserCode]. */ 32 /** Helper function to make an onError argument to [_runUserCode]. */
33 _cancelAndError(StreamSubscription subscription, _FutureImpl future) => 33 _cancelAndError(StreamSubscription subscription, _FutureImpl future) =>
34 (AsyncError error) { 34 (AsyncError error) {
35 subscription.cancel(); 35 subscription.cancel();
36 future._setError(error); 36 future._setError(error);
37 }; 37 };
38 38
39 39
40 /** 40 /**
41 * A wrapper around a stream that allows independent subscribers. 41 * A [StreamTransformer] that forwards events and subscriptions.
42 * 42 *
43 * By default [this] subscribes to [_source] and forwards all events to its own 43 * By default this transformer subscribes to [_source] and forwards all events
44 * subscribers. It does not subscribe until there is a subscriber, and 44 * to [_stream]. It does not subscribe to [_source] until there is a subscriber,
45 * unsubscribes again when there are no subscribers left. 45 * on [_stream] and unsubscribes again when there are no subscribers left.
46 * 46 *
47 * The events are passed through the [_handleData], [_handleError] and 47 * The events are passed through the [_handleData], [_handleError] and
48 * [_handleDone] methods. Subclasses are supposed to add handling of some of 48 * [_handleDone] methods. Subclasses are supposed to add handling of some of
49 * the events by overriding these methods. 49 * the events by overriding these methods.
50 * 50 *
51 * This class is intended for internal use only. 51 * This class is intended for internal use only.
52 */ 52 */
53 class _ForwardingMultiStream<S, T> extends _MultiStreamImpl<T> { 53 /**
54 Stream<S> _source = null; 54 *
55 StreamSubscription _subscription = null; 55 * Handles backwards propagation of subscription and pause.
56 */
57 class _ForwardingStreamTransformer<S, T> implements StreamTransformer<S, T> {
58 Stream<T> _stream;
59 Stream<S> _source;
60 StreamSubscription<S> _subscription;
56 61
57 void _subscribeToSource() { 62 Stream<T> _createOutputStream() {
58 _subscription = _source.listen(this._handleData, 63 if (_source.isSingleSubscription) {
59 onError: this._handleError, 64 return new _ForwardingSingleStream<T>(this);
60 onDone: this._handleDone); 65 }
61 if (_isPaused) { 66 return new _ForwardingMultiStream<T>(this);
62 _subscription.pause(); 67 }
68
69 Stream<T> bind(Stream<S> source) {
70 if (_source != null) {
71 throw new StateError("Transformer source already bound");
72 }
73 _source = source;
74 _stream = _createOutputStream();
75 return _stream;
76 }
77
78 void _onPauseStateChange(bool isPaused) {
79 if (isPaused) {
80 if (_subscription != null) {
81 _subscription.pause();
82 }
83 } else {
84 if (_subscription != null) {
85 _subscription.resume();
86 }
63 } 87 }
64 } 88 }
65 89
66 /** 90 /**
67 * Subscribe or unsubscribe on [source] depending on whether 91 * Subscribe or unsubscribe on [_source] depending on whether
68 * [stream] has subscribers. 92 * [_stream] has subscribers.
69 */ 93 */
70 void _onSubscriptionStateChange() { 94 void _onSubscriptionStateChange(bool hasSubscribers) {
71 if (_hasSubscribers) { 95 if (hasSubscribers) {
72 assert(_subscription == null); 96 assert(_subscription == null);
73 if (_source != null) { 97 _subscription = _source.listen(this._handleData,
74 _subscribeToSource(); 98 onError: this._handleError,
75 } 99 onDone: this._handleDone);
76 } else { 100 } else {
77 if (_subscription != null) { 101 // TODO(lrn): Check why this can happen.
78 _subscription.cancel(); 102 if (_subscription == null) return;
79 _subscription = null; 103 _subscription.cancel();
80 } 104 _subscription = null;
81 }
82 }
83
84 void _onPauseStateChange() {
85 if (_subscription == null) return;
86 if (isPaused) {
87 _subscription.pause();
88 } else {
89 _subscription.resume();
90 } 105 }
91 } 106 }
92 107
93 void _handleData(S inputEvent) { 108 void _handleData(S inputEvent) {
94 var outputEvent = inputEvent; 109 var outputEvent = inputEvent;
95 _add(outputEvent); 110 _stream._add(outputEvent);
96 } 111 }
97 112
98 void _handleError(AsyncError error) { 113 void _handleError(AsyncError error) {
99 _signalError(error); 114 _stream._signalError(error);
100 } 115 }
101 116
102 void _handleDone() { 117 void _handleDone() {
103 _close(); 118 _stream._close();
104 } 119 }
105 } 120 }
106 121
122 class _ForwardingMultiStream<T> extends _MultiStreamImpl<T> {
123 _ForwardingStreamTransformer _transformer;
124 _ForwardingMultiStream(this._transformer);
107 125
108 abstract class _ForwardingTransformer<S, T> extends _ForwardingMultiStream<S, T> 126 _onSubscriptionStateChange() {
109 implements StreamTransformer<S, T> { 127 _transformer._onSubscriptionStateChange(_hasSubscribers);
110 Stream<T> bind(Stream<S> source) { 128 }
111 if (_source != null) throw new StateError("Already bound to source."); 129
112 _source = source; 130 _onPauseStateChange() {
113 if (_hasSubscribers) { 131 _transformer._onPauseStateChange(_isPaused);
114 _subscribeToSource();
115 }
116 return this;
117 } 132 }
118 } 133 }
119 134
135 class _ForwardingSingleStream<T> extends _SingleStreamImpl<T> {
136 _ForwardingStreamTransformer _transformer;
137 _ForwardingSingleStream(this._transformer);
138
139 _onSubscriptionStateChange() {
140 _transformer._onSubscriptionStateChange(_hasSubscribers);
141 }
142
143 _onPauseStateChange() {
144 _transformer._onPauseStateChange(_isPaused);
145 }
146 }
147
148
120 // ------------------------------------------------------------------- 149 // -------------------------------------------------------------------
121 // Stream transformers used by the default Stream implementation. 150 // Stream transformers used by the default Stream implementation.
122 // ------------------------------------------------------------------- 151 // -------------------------------------------------------------------
123 152
124 typedef bool _Predicate<T>(T value); 153 typedef bool _Predicate<T>(T value);
125 154
126 class WhereStream<T> extends _ForwardingTransformer<T, T> { 155 class WhereTransformer<T> extends _ForwardingStreamTransformer<T, T> {
127 final _Predicate<T> _test; 156 final _Predicate<T> _test;
128 157
129 WhereStream(bool test(T value)) 158 WhereTransformer(bool test(T value))
130 : this._test = test; 159 : this._test = test;
131 160
132 void _handleData(T inputEvent) { 161 void _handleData(T inputEvent) {
133 bool satisfies; 162 bool satisfies;
134 try { 163 try {
135 satisfies = _test(inputEvent); 164 satisfies = _test(inputEvent);
136 } catch (e, s) { 165 } catch (e, s) {
137 _signalError(_asyncError(e, s)); 166 _stream._signalError(_asyncError(e, s));
138 return; 167 return;
139 } 168 }
140 if (satisfies) { 169 if (satisfies) {
141 _add(inputEvent); 170 _stream._add(inputEvent);
142 } 171 }
143 } 172 }
144 } 173 }
145 174
146 175
147 typedef T _Transformation<S, T>(S value); 176 typedef T _Transformation<S, T>(S value);
148 177
149 /** 178 /**
150 * A stream pipe that converts data events before passing them on. 179 * A stream pipe that converts data events before passing them on.
151 */ 180 */
152 class MapStream<S, T> extends _ForwardingTransformer<S, T> { 181 class MapTransformer<S, T> extends _ForwardingStreamTransformer<S, T> {
153 final _Transformation _transform; 182 final _Transformation _transform;
154 183
155 MapStream(T transform(S event)) 184 MapTransformer(T transform(S event))
156 : this._transform = transform; 185 : this._transform = transform;
157 186
158 void _handleData(S inputEvent) { 187 void _handleData(S inputEvent) {
159 T outputEvent; 188 T outputEvent;
160 try { 189 try {
161 outputEvent = _transform(inputEvent); 190 outputEvent = _transform(inputEvent);
162 } catch (e, s) { 191 } catch (e, s) {
163 _signalError(_asyncError(e, s)); 192 _stream._signalError(_asyncError(e, s));
164 return; 193 return;
165 } 194 }
166 _add(outputEvent); 195 _stream._add(outputEvent);
167 } 196 }
168 } 197 }
169 198
170 /** 199 /**
171 * A stream pipe that converts data events before passing them on. 200 * A stream pipe that converts data events before passing them on.
172 */ 201 */
173 class ExpandStream<S, T> extends _ForwardingTransformer<S, T> { 202 class ExpandTransformer<S, T> extends _ForwardingStreamTransformer<S, T> {
174 final _Transformation<S, Iterable<T>> _expand; 203 final _Transformation<S, Iterable<T>> _expand;
175 204
176 ExpandStream(Iterable<T> expand(S event)) 205 ExpandTransformer(Iterable<T> expand(S event))
177 : this._expand = expand; 206 : this._expand = expand;
178 207
179 void _handleData(S inputEvent) { 208 void _handleData(S inputEvent) {
180 try { 209 try {
181 for (T value in _expand(inputEvent)) { 210 for (T value in _expand(inputEvent)) {
182 _add(value); 211 _stream._add(value);
183 } 212 }
184 } catch (e, s) { 213 } catch (e, s) {
185 // If either _expand or iterating the generated iterator throws, 214 // If either _expand or iterating the generated iterator throws,
186 // we abort the iteration. 215 // we abort the iteration.
187 _signalError(_asyncError(e, s)); 216 _stream._signalError(_asyncError(e, s));
188 } 217 }
189 } 218 }
190 } 219 }
191 220
192 221
193 typedef void _ErrorTransformation(AsyncError error); 222 typedef void _ErrorTransformation(AsyncError error);
194 typedef bool _ErrorTest(error); 223 typedef bool _ErrorTest(error);
195 224
196 /** 225 /**
197 * A stream pipe that converts or disposes error events 226 * A stream pipe that converts or disposes error events
198 * before passing them on. 227 * before passing them on.
199 */ 228 */
200 class HandleErrorStream<T> extends _ForwardingTransformer<T, T> { 229 class HandleErrorTransformer<T> extends _ForwardingStreamTransformer<T, T> {
201 final _ErrorTransformation _transform; 230 final _ErrorTransformation _transform;
202 final _ErrorTest _test; 231 final _ErrorTest _test;
203 232
204 HandleErrorStream(void transform(AsyncError event), bool test(error)) 233 HandleErrorTransformer(void transform(AsyncError event), bool test(error))
205 : this._transform = transform, this._test = test; 234 : this._transform = transform, this._test = test;
206 235
207 void _handleError(AsyncError error) { 236 void _handleError(AsyncError error) {
208 bool matches = true; 237 bool matches = true;
209 if (_test != null) { 238 if (_test != null) {
210 try { 239 try {
211 matches = _test(error.error); 240 matches = _test(error.error);
212 } catch (e, s) { 241 } catch (e, s) {
213 _signalError(_asyncError(e, s, error)); 242 _stream._signalError(_asyncError(e, s, error));
214 return; 243 return;
215 } 244 }
216 } 245 }
217 if (matches) { 246 if (matches) {
218 try { 247 try {
219 _transform(error); 248 _transform(error);
220 } catch (e, s) { 249 } catch (e, s) {
221 _signalError(_asyncError(e, s, error)); 250 _stream._signalError(_asyncError(e, s, error));
222 return; 251 return;
223 } 252 }
224 } else { 253 } else {
225 _signalError(error); 254 _stream._signalError(error);
226 } 255 }
227 } 256 }
228 } 257 }
229 258
230 259
231 typedef void _TransformDataHandler<S, T>(S data, StreamSink<T> sink); 260 typedef void _TransformDataHandler<S, T>(S data, StreamSink<T> sink);
232 typedef void _TransformErrorHandler<T>(AsyncError data, StreamSink<T> sink); 261 typedef void _TransformErrorHandler<T>(AsyncError data, StreamSink<T> sink);
233 typedef void _TransformDoneHandler<T>(StreamSink<T> sink); 262 typedef void _TransformDoneHandler<T>(StreamSink<T> sink);
234 263
235 /** 264 /**
236 * A stream pipe that intercepts all events and can generate any event as 265 * A stream transfomer that intercepts all events and can generate any event as
237 * output. 266 * output.
238 * 267 *
239 * Each incoming event on this [StreamSink] is passed to the corresponding 268 * Each incoming event on the source stream is passed to the corresponding
240 * provided event handler, along with a [StreamSink] linked to the [output] of 269 * provided event handler, along with a [StreamSink] linked to the output
241 * this pipe. 270 * Stream.
242 * The handler can then decide which events to send to the output 271 * The handler can then decide exactly which events to send to the output.
243 */ 272 */
244 class PipeStream<S, T> extends _ForwardingTransformer<S, T> { 273 class _StreamTransformerImpl<S, T> extends _ForwardingStreamTransformer<S, T> {
245 final _TransformDataHandler<S, T> _onData; 274 final _TransformDataHandler<S, T> _onData;
246 final _TransformErrorHandler<T> _onError; 275 final _TransformErrorHandler<T> _onError;
247 final _TransformDoneHandler<T> _onDone; 276 final _TransformDoneHandler<T> _onDone;
248 StreamSink<T> _sink; 277 StreamSink<T> _sink;
249 278
250 PipeStream({void onData(S data, StreamSink<T> sink), 279 _StreamTransformerImpl(void onData(S data, StreamSink<T> sink),
251 void onError(AsyncError data, StreamSink<T> sink), 280 void onError(AsyncError data, StreamSink<T> sink),
252 void onDone(StreamSink<T> sink)}) 281 void onDone(StreamSink<T> sink))
253 : this._onData = (onData == null ? _defaultHandleData : onData), 282 : this._onData = (onData == null ? _defaultHandleData : onData),
254 this._onError = (onError == null ? _defaultHandleError : onError), 283 this._onError = (onError == null ? _defaultHandleError : onError),
255 this._onDone = (onDone == null ? _defaultHandleDone : onDone) { 284 this._onDone = (onDone == null ? _defaultHandleDone : onDone);
256 // Cache the sink wrapper to avoid creating a new one for each event. 285
257 this._sink = new _StreamImplSink(this); 286 Stream<T> bind(Stream<S> source) {
287 Stream<T> stream = super.bind(source);
288 // Cache a Sink object to avoid creating a new one for each event.
289 _sink = new _StreamImplSink(stream);
290 return stream;
258 } 291 }
259 292
260 void _handleData(S data) { 293 void _handleData(S data) {
261 try { 294 try {
262 return _onData(data, _sink); 295 _onData(data, _sink);
263 } catch (e, s) { 296 } catch (e, s) {
264 _signalError(_asyncError(e, s)); 297 _stream._signalError(_asyncError(e, s));
265 } 298 }
266 } 299 }
267 300
268 void _handleError(AsyncError error) { 301 void _handleError(AsyncError error) {
269 try { 302 try {
270 _onError(error, _sink); 303 _onError(error, _sink);
271 } catch (e, s) { 304 } catch (e, s) {
272 _signalError(_asyncError(e, s, error)); 305 _stream._signalError(_asyncError(e, s, error));
273 } 306 }
274 } 307 }
275 308
276 void _handleDone() { 309 void _handleDone() {
277 try { 310 try {
278 _onDone(_sink); 311 _onDone(_sink);
279 } catch (e, s) { 312 } catch (e, s) {
280 _signalError(_asyncError(e, s)); 313 _stream._signalError(_asyncError(e, s));
281 } 314 }
282 } 315 }
283 316
284 /** Default data handler forwards all data. */ 317 /** Default data handler forwards all data. */
285 static void _defaultHandleData(dynamic data, StreamSink sink) { 318 static void _defaultHandleData(var data, StreamSink sink) {
286 sink.add(data); 319 sink.add(data);
287 } 320 }
288 /** Default error handler forwards all errors. */ 321 /** Default error handler forwards all errors. */
289 static void _defaultHandleError(AsyncError error, StreamSink sink) { 322 static void _defaultHandleError(AsyncError error, StreamSink sink) {
290 sink.signalError(error); 323 sink.signalError(error);
291 } 324 }
292 /** Default done handler forwards done. */ 325 /** Default done handler forwards done. */
293 static void _defaultHandleDone(StreamSink sink) { 326 static void _defaultHandleDone(StreamSink sink) {
294 sink.close(); 327 sink.close();
295 } 328 }
296 } 329 }
297 330
298 /** Creates a [StreamSink] from a [_StreamImpl]'s input methods. */ 331 /** Creates a [StreamSink] from a [_StreamImpl]'s input methods. */
299 class _StreamImplSink<T> implements StreamSink<T> { 332 class _StreamImplSink<T> implements StreamSink<T> {
300 _StreamImpl<T> _target; 333 _StreamImpl<T> _target;
301 _StreamImplSink(this._target); 334 _StreamImplSink(this._target);
302 void add(T data) { _target._add(data); } 335 void add(T data) { _target._add(data); }
303 void signalError(AsyncError error) { _target._signalError(error); } 336 void signalError(AsyncError error) { _target._signalError(error); }
304 void close() { _target._close(); } 337 void close() { _target._close(); }
305 } 338 }
306 339
307 /**
308 * A stream pipe that intercepts all events and can generate any event as
309 * output.
310 *
311 * Each incoming event on this [StreamSink] is passed to the corresponding
312 * method on [transform], along with a [StreamSink] linked to the [output] of
313 * this pipe.
314 * The handler can then decide which events to send to the output
315 */
316 class TransformStream<S, T> extends _ForwardingTransformer<S, T> {
317 final StreamTransformer<S, T> _transform;
318 StreamSink<T> _sink;
319 340
320 TransformStream(StreamTransformer<S, T> transform) 341 class TakeTransformer<T> extends _ForwardingStreamTransformer<T, T> {
321 : this._transform = transform {
322 // Cache the sink wrapper to avoid creating a new one for each event.
323 this._sink = new _StreamImplSink(this);
324 }
325
326 void _handleData(S data) {
327 try {
328 return _transform.handleData(data, _sink);
329 } catch (e, s) {
330 _controller.signalError(_asyncError(e, s));
331 }
332 }
333
334 void _handleError(AsyncError error) {
335 try {
336 _transform.handleError(error, _sink);
337 } catch (e, s) {
338 _controller.signalError(_asyncError(e, s, error));
339 }
340 }
341
342 void _handleDone() {
343 try {
344 _transform.handleDone(_sink);
345 } catch (e, s) {
346 _controller.signalError(_asyncError(e, s));
347 }
348 }
349 }
350
351
352 /** Helper class for transforming three functions into a StreamTransformer. */
353 class _StreamTransformerFunctionWrapper<S, T>
354 extends _StreamTransformer<S, T> {
355 final _TransformDataHandler<S, T> _handleData;
356 final _TransformErrorHandler<T> _handleError;
357 final _TransformDoneHandler<T> _handleDone;
358
359 _StreamTransformerFunctionWrapper({
360 void onData(S data, StreamSink<T> sink),
361 void onError(AsyncError data, StreamSink<T> sink),
362 void onDone(StreamSink<T> sink)})
363 : _handleData = onData != null ? onData : PipeStream._defaultHandleData,
364 _handleError = onError != null ? onError
365 : PipeStream._defaultHandleError,
366 _handleDone = onDone != null ? onDone : PipeStream._defaultHandleDone;
367
368 void handleData(S data, StreamSink<T> sink) {
369 return _handleData(data, sink);
370 }
371
372 void handleError(AsyncError error, StreamSink<T> sink) {
373 _handleError(error, sink);
374 }
375
376 void handleDone(StreamSink<T> sink) {
377 _handleDone(sink);
378 }
379 }
380
381
382 class TakeStream<T> extends _ForwardingTransformer<T, T> {
383 int _remaining; 342 int _remaining;
384 343
385 TakeStream(int count) 344 TakeTransformer(int count)
386 : this._remaining = count { 345 : this._remaining = count {
387 // This test is done early to avoid handling an async error 346 // This test is done early to avoid handling an async error
388 // in the _handleData method. 347 // in the _handleData method.
389 if (count is! int) throw new ArgumentError(count); 348 if (count is! int) throw new ArgumentError(count);
390 } 349 }
391 350
392 void _handleData(T inputEvent) { 351 void _handleData(T inputEvent) {
393 if (_remaining > 0) { 352 if (_remaining > 0) {
394 _add(inputEvent); 353 _stream._add(inputEvent);
395 _remaining -= 1; 354 _remaining -= 1;
396 if (_remaining == 0) { 355 if (_remaining == 0) {
397 // Closing also unsubscribes all subscribers, which unsubscribes 356 // Closing also unsubscribes all subscribers, which unsubscribes
398 // this from source. 357 // this from source.
399 _close(); 358 _stream._close();
400 } 359 }
401 } 360 }
402 } 361 }
403 } 362 }
404 363
405 364
406 class TakeWhileStream<T> extends _ForwardingTransformer<T, T> { 365 class TakeWhileTransformer<T> extends _ForwardingStreamTransformer<T, T> {
407 final _Predicate<T> _test; 366 final _Predicate<T> _test;
408 367
409 TakeWhileStream(bool test(T value)) 368 TakeWhileTransformer(bool test(T value))
410 : this._test = test; 369 : this._test = test;
411 370
412 void _handleData(T inputEvent) { 371 void _handleData(T inputEvent) {
413 bool satisfies; 372 bool satisfies;
414 try { 373 try {
415 satisfies = _test(inputEvent); 374 satisfies = _test(inputEvent);
416 } catch (e, s) { 375 } catch (e, s) {
417 _signalError(_asyncError(e, s)); 376 _stream._signalError(_asyncError(e, s));
418 // The test didn't say true. Didn't say false either, but we stop anyway. 377 // The test didn't say true. Didn't say false either, but we stop anyway.
419 _close(); 378 _stream._close();
420 return; 379 return;
421 } 380 }
422 if (satisfies) { 381 if (satisfies) {
423 _add(inputEvent); 382 _stream._add(inputEvent);
424 } else { 383 } else {
425 _close(); 384 _stream._close();
426 } 385 }
427 } 386 }
428 } 387 }
429 388
430 class SkipStream<T> extends _ForwardingTransformer<T, T> { 389 class SkipTransformer<T> extends _ForwardingStreamTransformer<T, T> {
431 int _remaining; 390 int _remaining;
432 391
433 SkipStream(int count) 392 SkipTransformer(int count)
434 : this._remaining = count{ 393 : this._remaining = count{
435 // This test is done early to avoid handling an async error 394 // This test is done early to avoid handling an async error
436 // in the _handleData method. 395 // in the _handleData method.
437 if (count is! int) throw new ArgumentError(count); 396 if (count is! int || count < 0) throw new ArgumentError(count);
438 } 397 }
439 398
440 void _handleData(T inputEvent) { 399 void _handleData(T inputEvent) {
441 if (_remaining > 0) { 400 if (_remaining > 0) {
442 _remaining--; 401 _remaining--;
443 return; 402 return;
444 } 403 }
445 return _add(inputEvent); 404 return _stream._add(inputEvent);
446 } 405 }
447 } 406 }
448 407
449 class SkipWhileStream<T> extends _ForwardingTransformer<T, T> { 408 class SkipWhileTransformer<T> extends _ForwardingStreamTransformer<T, T> {
450 final _Predicate<T> _test; 409 final _Predicate<T> _test;
451 bool _hasFailed = false; 410 bool _hasFailed = false;
452 411
453 SkipWhileStream(bool test(T value)) 412 SkipWhileTransformer(bool test(T value))
454 : this._test = test; 413 : this._test = test;
455 414
456 void _handleData(T inputEvent) { 415 void _handleData(T inputEvent) {
457 if (_hasFailed) { 416 if (_hasFailed) {
458 _add(inputEvent); 417 _stream._add(inputEvent);
459 } 418 }
460 bool satisfies; 419 bool satisfies;
461 try { 420 try {
462 satisfies = _test(inputEvent); 421 satisfies = _test(inputEvent);
463 } catch (e, s) { 422 } catch (e, s) {
464 _signalError(_asyncError(e, s)); 423 _stream._signalError(_asyncError(e, s));
465 // A failure to return a boolean is considered "not matching". 424 // A failure to return a boolean is considered "not matching".
466 _hasFailed = true; 425 _hasFailed = true;
467 return; 426 return;
468 } 427 }
469 if (!satisfies) { 428 if (!satisfies) {
470 _hasFailed = true; 429 _hasFailed = true;
471 _add(inputEvent); 430 _stream._add(inputEvent);
472 } 431 }
473 } 432 }
474 } 433 }
475 434
476 typedef bool _Equality<T>(T a, T b); 435 typedef bool _Equality<T>(T a, T b);
477 436
478 class DistinctStream<T> extends _ForwardingTransformer<T, T> { 437 class DistinctTransformer<T> extends _ForwardingStreamTransformer<T, T> {
479 static var _SENTINEL = new Object(); 438 static var _SENTINEL = new Object();
480 439
481 _Equality<T> _equals; 440 _Equality<T> _equals;
482 var _previous = _SENTINEL; 441 var _previous = _SENTINEL;
483 442
484 DistinctStream(bool equals(T a, T b)) 443 DistinctTransformer(bool equals(T a, T b))
485 : _equals = equals; 444 : _equals = equals;
486 445
487 void _handleData(T inputEvent) { 446 void _handleData(T inputEvent) {
488 if (identical(_previous, _SENTINEL)) { 447 if (identical(_previous, _SENTINEL)) {
489 _previous = inputEvent; 448 _previous = inputEvent;
490 return _add(inputEvent); 449 return _stream._add(inputEvent);
491 } else { 450 } else {
492 bool isEqual; 451 bool isEqual;
493 try { 452 try {
494 if (_equals == null) { 453 if (_equals == null) {
495 isEqual = (_previous == inputEvent); 454 isEqual = (_previous == inputEvent);
496 } else { 455 } else {
497 isEqual = _equals(_previous, inputEvent); 456 isEqual = _equals(_previous, inputEvent);
498 } 457 }
499 } catch (e, s) { 458 } catch (e, s) {
500 _signalError(_asyncError(e, s)); 459 _stream._signalError(_asyncError(e, s));
501 return null; 460 return null;
502 } 461 }
503 if (!isEqual) { 462 if (!isEqual) {
504 _add(inputEvent); 463 _stream._add(inputEvent);
505 _previous = inputEvent; 464 _previous = inputEvent;
506 } 465 }
507 } 466 }
508 } 467 }
509 } 468 }
OLDNEW
« sdk/lib/async/stream_impl.dart ('K') | « sdk/lib/async/stream_impl.dart ('k') | no next file » | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698