Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(133)

Side by Side Diff: sdk/lib/async/stream_pipe.dart

Issue 2754013002: Format all dart: library files (Closed)
Patch Set: Format all dart: library files Created 3 years, 9 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
« no previous file with comments | « sdk/lib/async/stream_impl.dart ('k') | sdk/lib/async/stream_transformers.dart » ('j') | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
1 // Copyright (c) 2012, the Dart project authors. Please see the AUTHORS file 1 // Copyright (c) 2012, the Dart project authors. Please see the AUTHORS file
2 // for details. All rights reserved. Use of this source code is governed by a 2 // for details. All rights reserved. Use of this source code is governed by a
3 // BSD-style license that can be found in the LICENSE file. 3 // BSD-style license that can be found in the LICENSE file.
4 4
5 part of dart.async; 5 part of dart.async;
6 6
7 /** Runs user code and takes actions depending on success or failure. */ 7 /** Runs user code and takes actions depending on success or failure. */
8 _runUserCode(userCode(), 8 _runUserCode(
9 onSuccess(value), 9 userCode(), onSuccess(value), onError(error, StackTrace stackTrace)) {
10 onError(error, StackTrace stackTrace)) {
11 try { 10 try {
12 onSuccess(userCode()); 11 onSuccess(userCode());
13 } catch (e, s) { 12 } catch (e, s) {
14 AsyncError replacement = Zone.current.errorCallback(e, s); 13 AsyncError replacement = Zone.current.errorCallback(e, s);
15 if (replacement == null) { 14 if (replacement == null) {
16 onError(e, s); 15 onError(e, s);
17 } else { 16 } else {
18 var error = _nonNullError(replacement.error); 17 var error = _nonNullError(replacement.error);
19 var stackTrace = replacement.stackTrace; 18 var stackTrace = replacement.stackTrace;
20 onError(error, stackTrace); 19 onError(error, stackTrace);
21 } 20 }
22 } 21 }
23 } 22 }
24 23
25 /** Helper function to cancel a subscription and wait for the potential future, 24 /** Helper function to cancel a subscription and wait for the potential future,
26 before completing with an error. */ 25 before completing with an error. */
27 void _cancelAndError(StreamSubscription subscription, 26 void _cancelAndError(StreamSubscription subscription, _Future future, error,
28 _Future future, 27 StackTrace stackTrace) {
29 error,
30 StackTrace stackTrace) {
31 var cancelFuture = subscription.cancel(); 28 var cancelFuture = subscription.cancel();
32 if (cancelFuture is Future && !identical(cancelFuture, Future._nullFuture)) { 29 if (cancelFuture is Future && !identical(cancelFuture, Future._nullFuture)) {
33 cancelFuture.whenComplete(() => future._completeError(error, stackTrace)); 30 cancelFuture.whenComplete(() => future._completeError(error, stackTrace));
34 } else { 31 } else {
35 future._completeError(error, stackTrace); 32 future._completeError(error, stackTrace);
36 } 33 }
37 } 34 }
38 35
39 void _cancelAndErrorWithReplacement(StreamSubscription subscription, 36 void _cancelAndErrorWithReplacement(StreamSubscription subscription,
40 _Future future, 37 _Future future, error, StackTrace stackTrace) {
41 error, StackTrace stackTrace) {
42 AsyncError replacement = Zone.current.errorCallback(error, stackTrace); 38 AsyncError replacement = Zone.current.errorCallback(error, stackTrace);
43 if (replacement != null) { 39 if (replacement != null) {
44 error = _nonNullError(replacement.error); 40 error = _nonNullError(replacement.error);
45 stackTrace = replacement.stackTrace; 41 stackTrace = replacement.stackTrace;
46 } 42 }
47 _cancelAndError(subscription, future, error, stackTrace); 43 _cancelAndError(subscription, future, error, stackTrace);
48 } 44 }
49 45
50 typedef void _ErrorCallback(error, StackTrace stackTrace); 46 typedef void _ErrorCallback(error, StackTrace stackTrace);
51 47
52 /** Helper function to make an onError argument to [_runUserCode]. */ 48 /** Helper function to make an onError argument to [_runUserCode]. */
53 _ErrorCallback _cancelAndErrorClosure( 49 _ErrorCallback _cancelAndErrorClosure(
54 StreamSubscription subscription, _Future future) { 50 StreamSubscription subscription, _Future future) {
55 return (error, StackTrace stackTrace) { 51 return (error, StackTrace stackTrace) {
56 _cancelAndError(subscription, future, error, stackTrace); 52 _cancelAndError(subscription, future, error, stackTrace);
57 }; 53 };
58 } 54 }
59 55
60 /** Helper function to cancel a subscription and wait for the potential future, 56 /** Helper function to cancel a subscription and wait for the potential future,
61 before completing with a value. */ 57 before completing with a value. */
62 void _cancelAndValue(StreamSubscription subscription, _Future future, value) { 58 void _cancelAndValue(StreamSubscription subscription, _Future future, value) {
63 var cancelFuture = subscription.cancel(); 59 var cancelFuture = subscription.cancel();
64 if (cancelFuture is Future && !identical(cancelFuture, Future._nullFuture)) { 60 if (cancelFuture is Future && !identical(cancelFuture, Future._nullFuture)) {
65 cancelFuture.whenComplete(() => future._complete(value)); 61 cancelFuture.whenComplete(() => future._complete(value));
66 } else { 62 } else {
67 future._complete(value); 63 future._complete(value);
68 } 64 }
69 } 65 }
70 66
71
72 /** 67 /**
73 * A [Stream] that forwards subscriptions to another stream. 68 * A [Stream] that forwards subscriptions to another stream.
74 * 69 *
75 * This stream implements [Stream], but forwards all subscriptions 70 * This stream implements [Stream], but forwards all subscriptions
76 * to an underlying stream, and wraps the returned subscription to 71 * to an underlying stream, and wraps the returned subscription to
77 * modify the events on the way. 72 * modify the events on the way.
78 * 73 *
79 * This class is intended for internal use only. 74 * This class is intended for internal use only.
80 */ 75 */
81 abstract class _ForwardingStream<S, T> extends Stream<T> { 76 abstract class _ForwardingStream<S, T> extends Stream<T> {
82 final Stream<S> _source; 77 final Stream<S> _source;
83 78
84 _ForwardingStream(this._source); 79 _ForwardingStream(this._source);
85 80
86 bool get isBroadcast => _source.isBroadcast; 81 bool get isBroadcast => _source.isBroadcast;
87 82
88 StreamSubscription<T> listen(void onData(T value), 83 StreamSubscription<T> listen(void onData(T value),
89 { Function onError, 84 {Function onError, void onDone(), bool cancelOnError}) {
90 void onDone(),
91 bool cancelOnError }) {
92 cancelOnError = identical(true, cancelOnError); 85 cancelOnError = identical(true, cancelOnError);
93 return _createSubscription(onData, onError, onDone, cancelOnError); 86 return _createSubscription(onData, onError, onDone, cancelOnError);
94 } 87 }
95 88
96 StreamSubscription<T> _createSubscription( 89 StreamSubscription<T> _createSubscription(void onData(T data),
97 void onData(T data), 90 Function onError, void onDone(), bool cancelOnError) {
98 Function onError,
99 void onDone(),
100 bool cancelOnError) {
101 return new _ForwardingStreamSubscription<S, T>( 91 return new _ForwardingStreamSubscription<S, T>(
102 this, onData, onError, onDone, cancelOnError); 92 this, onData, onError, onDone, cancelOnError);
103 } 93 }
104 94
105 // Override the following methods in subclasses to change the behavior. 95 // Override the following methods in subclasses to change the behavior.
106 96
107 void _handleData(S data, _EventSink<T> sink) { 97 void _handleData(S data, _EventSink<T> sink) {
108 sink._add(data as Object /*=T*/); 98 sink._add(data as Object/*=T*/);
109 } 99 }
110 100
111 void _handleError(error, StackTrace stackTrace, _EventSink<T> sink) { 101 void _handleError(error, StackTrace stackTrace, _EventSink<T> sink) {
112 sink._addError(error, stackTrace); 102 sink._addError(error, stackTrace);
113 } 103 }
114 104
115 void _handleDone(_EventSink<T> sink) { 105 void _handleDone(_EventSink<T> sink) {
116 sink._close(); 106 sink._close();
117 } 107 }
118 } 108 }
119 109
120 /** 110 /**
121 * Abstract superclass for subscriptions that forward to other subscriptions. 111 * Abstract superclass for subscriptions that forward to other subscriptions.
122 */ 112 */
123 class _ForwardingStreamSubscription<S, T> 113 class _ForwardingStreamSubscription<S, T>
124 extends _BufferingStreamSubscription<T> { 114 extends _BufferingStreamSubscription<T> {
125 final _ForwardingStream<S, T> _stream; 115 final _ForwardingStream<S, T> _stream;
126 116
127 StreamSubscription<S> _subscription; 117 StreamSubscription<S> _subscription;
128 118
129 _ForwardingStreamSubscription(this._stream, void onData(T data), 119 _ForwardingStreamSubscription(this._stream, void onData(T data),
130 Function onError, void onDone(), 120 Function onError, void onDone(), bool cancelOnError)
131 bool cancelOnError)
132 : super(onData, onError, onDone, cancelOnError) { 121 : super(onData, onError, onDone, cancelOnError) {
133 _subscription = _stream._source.listen(_handleData, 122 _subscription = _stream._source
134 onError: _handleError, 123 .listen(_handleData, onError: _handleError, onDone: _handleDone);
135 onDone: _handleDone);
136 } 124 }
137 125
138 // _StreamSink interface. 126 // _StreamSink interface.
139 // Transformers sending more than one event have no way to know if the stream 127 // Transformers sending more than one event have no way to know if the stream
140 // is canceled or closed after the first, so we just ignore remaining events. 128 // is canceled or closed after the first, so we just ignore remaining events.
141 129
142 void _add(T data) { 130 void _add(T data) {
143 if (_isClosed) return; 131 if (_isClosed) return;
144 super._add(data); 132 super._add(data);
145 } 133 }
(...skipping 47 matching lines...) Expand 10 before | Expand all | Expand 10 after
193 181
194 void _addErrorWithReplacement(_EventSink sink, error, stackTrace) { 182 void _addErrorWithReplacement(_EventSink sink, error, stackTrace) {
195 AsyncError replacement = Zone.current.errorCallback(error, stackTrace); 183 AsyncError replacement = Zone.current.errorCallback(error, stackTrace);
196 if (replacement != null) { 184 if (replacement != null) {
197 error = _nonNullError(replacement.error); 185 error = _nonNullError(replacement.error);
198 stackTrace = replacement.stackTrace; 186 stackTrace = replacement.stackTrace;
199 } 187 }
200 sink._addError(error, stackTrace); 188 sink._addError(error, stackTrace);
201 } 189 }
202 190
203
204 class _WhereStream<T> extends _ForwardingStream<T, T> { 191 class _WhereStream<T> extends _ForwardingStream<T, T> {
205 final _Predicate<T> _test; 192 final _Predicate<T> _test;
206 193
207 _WhereStream(Stream<T> source, bool test(T value)) 194 _WhereStream(Stream<T> source, bool test(T value))
208 : _test = test, super(source); 195 : _test = test,
196 super(source);
209 197
210 void _handleData(T inputEvent, _EventSink<T> sink) { 198 void _handleData(T inputEvent, _EventSink<T> sink) {
211 bool satisfies; 199 bool satisfies;
212 try { 200 try {
213 satisfies = _test(inputEvent); 201 satisfies = _test(inputEvent);
214 } catch (e, s) { 202 } catch (e, s) {
215 _addErrorWithReplacement(sink, e, s); 203 _addErrorWithReplacement(sink, e, s);
216 return; 204 return;
217 } 205 }
218 if (satisfies) { 206 if (satisfies) {
219 sink._add(inputEvent); 207 sink._add(inputEvent);
220 } 208 }
221 } 209 }
222 } 210 }
223 211
224
225 typedef T _Transformation<S, T>(S value); 212 typedef T _Transformation<S, T>(S value);
226 213
227 /** 214 /**
228 * A stream pipe that converts data events before passing them on. 215 * A stream pipe that converts data events before passing them on.
229 */ 216 */
230 class _MapStream<S, T> extends _ForwardingStream<S, T> { 217 class _MapStream<S, T> extends _ForwardingStream<S, T> {
231 final _Transformation<S, T> _transform; 218 final _Transformation<S, T> _transform;
232 219
233 _MapStream(Stream<S> source, T transform(S event)) 220 _MapStream(Stream<S> source, T transform(S event))
234 : this._transform = transform, super(source); 221 : this._transform = transform,
222 super(source);
235 223
236 void _handleData(S inputEvent, _EventSink<T> sink) { 224 void _handleData(S inputEvent, _EventSink<T> sink) {
237 T outputEvent; 225 T outputEvent;
238 try { 226 try {
239 outputEvent = _transform(inputEvent); 227 outputEvent = _transform(inputEvent);
240 } catch (e, s) { 228 } catch (e, s) {
241 _addErrorWithReplacement(sink, e, s); 229 _addErrorWithReplacement(sink, e, s);
242 return; 230 return;
243 } 231 }
244 sink._add(outputEvent); 232 sink._add(outputEvent);
245 } 233 }
246 } 234 }
247 235
248 /** 236 /**
249 * A stream pipe that converts data events before passing them on. 237 * A stream pipe that converts data events before passing them on.
250 */ 238 */
251 class _ExpandStream<S, T> extends _ForwardingStream<S, T> { 239 class _ExpandStream<S, T> extends _ForwardingStream<S, T> {
252 final _Transformation<S, Iterable<T>> _expand; 240 final _Transformation<S, Iterable<T>> _expand;
253 241
254 _ExpandStream(Stream<S> source, Iterable<T> expand(S event)) 242 _ExpandStream(Stream<S> source, Iterable<T> expand(S event))
255 : this._expand = expand, super(source); 243 : this._expand = expand,
244 super(source);
256 245
257 void _handleData(S inputEvent, _EventSink<T> sink) { 246 void _handleData(S inputEvent, _EventSink<T> sink) {
258 try { 247 try {
259 for (T value in _expand(inputEvent)) { 248 for (T value in _expand(inputEvent)) {
260 sink._add(value); 249 sink._add(value);
261 } 250 }
262 } catch (e, s) { 251 } catch (e, s) {
263 // If either _expand or iterating the generated iterator throws, 252 // If either _expand or iterating the generated iterator throws,
264 // we abort the iteration. 253 // we abort the iteration.
265 _addErrorWithReplacement(sink, e, s); 254 _addErrorWithReplacement(sink, e, s);
266 } 255 }
267 } 256 }
268 } 257 }
269 258
270
271 typedef bool _ErrorTest(error); 259 typedef bool _ErrorTest(error);
272 260
273 /** 261 /**
274 * A stream pipe that converts or disposes error events 262 * A stream pipe that converts or disposes error events
275 * before passing them on. 263 * before passing them on.
276 */ 264 */
277 class _HandleErrorStream<T> extends _ForwardingStream<T, T> { 265 class _HandleErrorStream<T> extends _ForwardingStream<T, T> {
278 final Function _transform; 266 final Function _transform;
279 final _ErrorTest _test; 267 final _ErrorTest _test;
280 268
281 _HandleErrorStream(Stream<T> source, 269 _HandleErrorStream(Stream<T> source, Function onError, bool test(error))
282 Function onError, 270 : this._transform = onError,
283 bool test(error)) 271 this._test = test,
284 : this._transform = onError, this._test = test, super(source); 272 super(source);
285 273
286 void _handleError(Object error, StackTrace stackTrace, _EventSink<T> sink) { 274 void _handleError(Object error, StackTrace stackTrace, _EventSink<T> sink) {
287 bool matches = true; 275 bool matches = true;
288 if (_test != null) { 276 if (_test != null) {
289 try { 277 try {
290 matches = _test(error); 278 matches = _test(error);
291 } catch (e, s) { 279 } catch (e, s) {
292 _addErrorWithReplacement(sink, e, s); 280 _addErrorWithReplacement(sink, e, s);
293 return; 281 return;
294 } 282 }
295 } 283 }
296 if (matches) { 284 if (matches) {
297 try { 285 try {
298 _invokeErrorHandler(_transform, error, stackTrace); 286 _invokeErrorHandler(_transform, error, stackTrace);
299 } catch (e, s) { 287 } catch (e, s) {
300 if (identical(e, error)) { 288 if (identical(e, error)) {
301 sink._addError(error, stackTrace); 289 sink._addError(error, stackTrace);
302 } else { 290 } else {
303 _addErrorWithReplacement(sink, e, s); 291 _addErrorWithReplacement(sink, e, s);
304 } 292 }
305 return; 293 return;
306 } 294 }
307 } else { 295 } else {
308 sink._addError(error, stackTrace); 296 sink._addError(error, stackTrace);
309 } 297 }
310 } 298 }
311 } 299 }
312 300
313
314 class _TakeStream<T> extends _ForwardingStream<T, T> { 301 class _TakeStream<T> extends _ForwardingStream<T, T> {
315 final int _count; 302 final int _count;
316 303
317 _TakeStream(Stream<T> source, int count) 304 _TakeStream(Stream<T> source, int count)
318 : this._count = count, super(source) { 305 : this._count = count,
306 super(source) {
319 // This test is done early to avoid handling an async error 307 // This test is done early to avoid handling an async error
320 // in the _handleData method. 308 // in the _handleData method.
321 if (count is! int) throw new ArgumentError(count); 309 if (count is! int) throw new ArgumentError(count);
322 } 310 }
323 311
324 StreamSubscription<T> _createSubscription( 312 StreamSubscription<T> _createSubscription(void onData(T data),
325 void onData(T data), 313 Function onError, void onDone(), bool cancelOnError) {
326 Function onError,
327 void onDone(),
328 bool cancelOnError) {
329 if (_count == 0) { 314 if (_count == 0) {
330 _source.listen(null).cancel(); 315 _source.listen(null).cancel();
331 return new _DoneStreamSubscription<T>(onDone); 316 return new _DoneStreamSubscription<T>(onDone);
332 } 317 }
333 return new _StateStreamSubscription<T>( 318 return new _StateStreamSubscription<T>(
334 this, onData, onError, onDone, cancelOnError, _count); 319 this, onData, onError, onDone, cancelOnError, _count);
335 } 320 }
336 321
337 void _handleData(T inputEvent, _EventSink<T> sink) { 322 void _handleData(T inputEvent, _EventSink<T> sink) {
338 _StateStreamSubscription<T> subscription = sink; 323 _StateStreamSubscription<T> subscription = sink;
(...skipping 14 matching lines...) Expand all
353 /** 338 /**
354 * A [_ForwardingStreamSubscription] with one extra state field. 339 * A [_ForwardingStreamSubscription] with one extra state field.
355 * 340 *
356 * Use by several different classes, some storing an integer, others a bool. 341 * Use by several different classes, some storing an integer, others a bool.
357 */ 342 */
358 class _StateStreamSubscription<T> extends _ForwardingStreamSubscription<T, T> { 343 class _StateStreamSubscription<T> extends _ForwardingStreamSubscription<T, T> {
359 // Raw state field. Typed access provided by getters and setters below. 344 // Raw state field. Typed access provided by getters and setters below.
360 var _sharedState; 345 var _sharedState;
361 346
362 _StateStreamSubscription(_ForwardingStream<T, T> stream, void onData(T data), 347 _StateStreamSubscription(_ForwardingStream<T, T> stream, void onData(T data),
363 Function onError, void onDone(), 348 Function onError, void onDone(), bool cancelOnError, this._sharedState)
364 bool cancelOnError, this._sharedState)
365 : super(stream, onData, onError, onDone, cancelOnError); 349 : super(stream, onData, onError, onDone, cancelOnError);
366 350
367 bool get _flag => _sharedState; 351 bool get _flag => _sharedState;
368 void set _flag(bool flag) { _sharedState = flag; } 352 void set _flag(bool flag) {
353 _sharedState = flag;
354 }
355
369 int get _count => _sharedState; 356 int get _count => _sharedState;
370 void set _count(int count) { _sharedState = count; } 357 void set _count(int count) {
358 _sharedState = count;
359 }
371 } 360 }
372 361
373
374 class _TakeWhileStream<T> extends _ForwardingStream<T, T> { 362 class _TakeWhileStream<T> extends _ForwardingStream<T, T> {
375 final _Predicate<T> _test; 363 final _Predicate<T> _test;
376 364
377 _TakeWhileStream(Stream<T> source, bool test(T value)) 365 _TakeWhileStream(Stream<T> source, bool test(T value))
378 : this._test = test, super(source); 366 : this._test = test,
367 super(source);
379 368
380 void _handleData(T inputEvent, _EventSink<T> sink) { 369 void _handleData(T inputEvent, _EventSink<T> sink) {
381 bool satisfies; 370 bool satisfies;
382 try { 371 try {
383 satisfies = _test(inputEvent); 372 satisfies = _test(inputEvent);
384 } catch (e, s) { 373 } catch (e, s) {
385 _addErrorWithReplacement(sink, e, s); 374 _addErrorWithReplacement(sink, e, s);
386 // The test didn't say true. Didn't say false either, but we stop anyway. 375 // The test didn't say true. Didn't say false either, but we stop anyway.
387 sink._close(); 376 sink._close();
388 return; 377 return;
389 } 378 }
390 if (satisfies) { 379 if (satisfies) {
391 sink._add(inputEvent); 380 sink._add(inputEvent);
392 } else { 381 } else {
393 sink._close(); 382 sink._close();
394 } 383 }
395 } 384 }
396 } 385 }
397 386
398 class _SkipStream<T> extends _ForwardingStream<T, T> { 387 class _SkipStream<T> extends _ForwardingStream<T, T> {
399 final int _count; 388 final int _count;
400 389
401 _SkipStream(Stream<T> source, int count) 390 _SkipStream(Stream<T> source, int count)
402 : this._count = count, super(source) { 391 : this._count = count,
392 super(source) {
403 // This test is done early to avoid handling an async error 393 // This test is done early to avoid handling an async error
404 // in the _handleData method. 394 // in the _handleData method.
405 if (count is! int || count < 0) throw new ArgumentError(count); 395 if (count is! int || count < 0) throw new ArgumentError(count);
406 } 396 }
407 397
408 StreamSubscription<T> _createSubscription( 398 StreamSubscription<T> _createSubscription(void onData(T data),
409 void onData(T data), 399 Function onError, void onDone(), bool cancelOnError) {
410 Function onError,
411 void onDone(),
412 bool cancelOnError) {
413 return new _StateStreamSubscription<T>( 400 return new _StateStreamSubscription<T>(
414 this, onData, onError, onDone, cancelOnError, _count); 401 this, onData, onError, onDone, cancelOnError, _count);
415 } 402 }
416 403
417 void _handleData(T inputEvent, _EventSink<T> sink) { 404 void _handleData(T inputEvent, _EventSink<T> sink) {
418 _StateStreamSubscription<T> subscription = sink; 405 _StateStreamSubscription<T> subscription = sink;
419 int count = subscription._count; 406 int count = subscription._count;
420 if (count > 0) { 407 if (count > 0) {
421 subscription._count = count - 1; 408 subscription._count = count - 1;
422 return; 409 return;
423 } 410 }
424 sink._add(inputEvent); 411 sink._add(inputEvent);
425 } 412 }
426 } 413 }
427 414
428 class _SkipWhileStream<T> extends _ForwardingStream<T, T> { 415 class _SkipWhileStream<T> extends _ForwardingStream<T, T> {
429 final _Predicate<T> _test; 416 final _Predicate<T> _test;
430 417
431 _SkipWhileStream(Stream<T> source, bool test(T value)) 418 _SkipWhileStream(Stream<T> source, bool test(T value))
432 : this._test = test, super(source); 419 : this._test = test,
420 super(source);
433 421
434 StreamSubscription<T> _createSubscription( 422 StreamSubscription<T> _createSubscription(void onData(T data),
435 void onData(T data), 423 Function onError, void onDone(), bool cancelOnError) {
436 Function onError,
437 void onDone(),
438 bool cancelOnError) {
439 return new _StateStreamSubscription<T>( 424 return new _StateStreamSubscription<T>(
440 this, onData, onError, onDone, cancelOnError, false); 425 this, onData, onError, onDone, cancelOnError, false);
441 } 426 }
442 427
443 void _handleData(T inputEvent, _EventSink<T> sink) { 428 void _handleData(T inputEvent, _EventSink<T> sink) {
444 _StateStreamSubscription<T> subscription = sink; 429 _StateStreamSubscription<T> subscription = sink;
445 bool hasFailed = subscription._flag; 430 bool hasFailed = subscription._flag;
446 if (hasFailed) { 431 if (hasFailed) {
447 sink._add(inputEvent); 432 sink._add(inputEvent);
448 return; 433 return;
(...skipping 16 matching lines...) Expand all
465 450
466 typedef bool _Equality<T>(T a, T b); 451 typedef bool _Equality<T>(T a, T b);
467 452
468 class _DistinctStream<T> extends _ForwardingStream<T, T> { 453 class _DistinctStream<T> extends _ForwardingStream<T, T> {
469 static var _SENTINEL = new Object(); 454 static var _SENTINEL = new Object();
470 455
471 _Equality<T> _equals; 456 _Equality<T> _equals;
472 var _previous = _SENTINEL; 457 var _previous = _SENTINEL;
473 458
474 _DistinctStream(Stream<T> source, bool equals(T a, T b)) 459 _DistinctStream(Stream<T> source, bool equals(T a, T b))
475 : _equals = equals, super(source); 460 : _equals = equals,
461 super(source);
476 462
477 void _handleData(T inputEvent, _EventSink<T> sink) { 463 void _handleData(T inputEvent, _EventSink<T> sink) {
478 if (identical(_previous, _SENTINEL)) { 464 if (identical(_previous, _SENTINEL)) {
479 _previous = inputEvent; 465 _previous = inputEvent;
480 return sink._add(inputEvent); 466 return sink._add(inputEvent);
481 } else { 467 } else {
482 bool isEqual; 468 bool isEqual;
483 try { 469 try {
484 if (_equals == null) { 470 if (_equals == null) {
485 isEqual = (_previous == inputEvent); 471 isEqual = (_previous == inputEvent);
486 } else { 472 } else {
487 isEqual = _equals(_previous as Object /*=T*/, inputEvent); 473 isEqual = _equals(_previous as Object/*=T*/, inputEvent);
488 } 474 }
489 } catch (e, s) { 475 } catch (e, s) {
490 _addErrorWithReplacement(sink, e, s); 476 _addErrorWithReplacement(sink, e, s);
491 return null; 477 return null;
492 } 478 }
493 if (!isEqual) { 479 if (!isEqual) {
494 sink._add(inputEvent); 480 sink._add(inputEvent);
495 _previous = inputEvent; 481 _previous = inputEvent;
496 } 482 }
497 } 483 }
498 } 484 }
499 } 485 }
OLDNEW
« no previous file with comments | « sdk/lib/async/stream_impl.dart ('k') | sdk/lib/async/stream_transformers.dart » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698