OLD | NEW |
1 // Copyright (c) 2012, the Dart project authors. Please see the AUTHORS file | 1 // Copyright (c) 2012, the Dart project authors. Please see the AUTHORS file |
2 // for details. All rights reserved. Use of this source code is governed by a | 2 // for details. All rights reserved. Use of this source code is governed by a |
3 // BSD-style license that can be found in the LICENSE file. | 3 // BSD-style license that can be found in the LICENSE file. |
4 | 4 |
5 part of dart.async; | 5 part of dart.async; |
6 | 6 |
7 /** Runs user code and takes actions depending on success or failure. */ | 7 /** Runs user code and takes actions depending on success or failure. */ |
8 _runUserCode<T>( | 8 _runUserCode<T>( |
9 T userCode(), onSuccess(T value), onError(error, StackTrace stackTrace)) { | 9 T userCode(), onSuccess(T value), onError(error, StackTrace stackTrace)) { |
10 try { | 10 try { |
(...skipping 320 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
331 // this from source. | 331 // this from source. |
332 sink._close(); | 332 sink._close(); |
333 } | 333 } |
334 } | 334 } |
335 } | 335 } |
336 } | 336 } |
337 | 337 |
338 /** | 338 /** |
339 * A [_ForwardingStreamSubscription] with one extra state field. | 339 * A [_ForwardingStreamSubscription] with one extra state field. |
340 * | 340 * |
341 * Use by several different classes, some storing an integer, others a bool. | 341 * Use by several different classes, storing an integer, bool or general. |
342 */ | 342 */ |
343 class _StateStreamSubscription<T> extends _ForwardingStreamSubscription<T, T> { | 343 class _StateStreamSubscription<T> extends _ForwardingStreamSubscription<T, T> { |
344 // Raw state field. Typed access provided by getters and setters below. | 344 // Raw state field. Typed access provided by getters and setters below. |
345 var _sharedState; | 345 var _sharedState; |
346 | 346 |
347 _StateStreamSubscription(_ForwardingStream<T, T> stream, void onData(T data), | 347 _StateStreamSubscription(_ForwardingStream<T, T> stream, void onData(T data), |
348 Function onError, void onDone(), bool cancelOnError, this._sharedState) | 348 Function onError, void onDone(), bool cancelOnError, this._sharedState) |
349 : super(stream, onData, onError, onDone, cancelOnError); | 349 : super(stream, onData, onError, onDone, cancelOnError); |
350 | 350 |
351 bool get _flag => _sharedState; | 351 bool get _flag => _sharedState; |
352 void set _flag(bool flag) { | 352 void set _flag(bool flag) { |
353 _sharedState = flag; | 353 _sharedState = flag; |
354 } | 354 } |
355 | 355 |
356 int get _count => _sharedState; | 356 int get _count => _sharedState; |
357 void set _count(int count) { | 357 void set _count(int count) { |
358 _sharedState = count; | 358 _sharedState = count; |
359 } | 359 } |
| 360 |
| 361 Object get _value => _sharedState; |
| 362 void set _value(Object value) { |
| 363 _sharedState = value; |
| 364 } |
360 } | 365 } |
361 | 366 |
362 class _TakeWhileStream<T> extends _ForwardingStream<T, T> { | 367 class _TakeWhileStream<T> extends _ForwardingStream<T, T> { |
363 final _Predicate<T> _test; | 368 final _Predicate<T> _test; |
364 | 369 |
365 _TakeWhileStream(Stream<T> source, bool test(T value)) | 370 _TakeWhileStream(Stream<T> source, bool test(T value)) |
366 : this._test = test, | 371 : this._test = test, |
367 super(source); | 372 super(source); |
368 | 373 |
369 void _handleData(T inputEvent, _EventSink<T> sink) { | 374 void _handleData(T inputEvent, _EventSink<T> sink) { |
(...skipping 76 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
446 sink._add(inputEvent); | 451 sink._add(inputEvent); |
447 } | 452 } |
448 } | 453 } |
449 } | 454 } |
450 | 455 |
451 typedef bool _Equality<T>(T a, T b); | 456 typedef bool _Equality<T>(T a, T b); |
452 | 457 |
453 class _DistinctStream<T> extends _ForwardingStream<T, T> { | 458 class _DistinctStream<T> extends _ForwardingStream<T, T> { |
454 static var _SENTINEL = new Object(); | 459 static var _SENTINEL = new Object(); |
455 | 460 |
456 _Equality<T> _equals; | 461 final _Equality<T> _equals; |
457 var _previous = _SENTINEL; | |
458 | 462 |
459 _DistinctStream(Stream<T> source, bool equals(T a, T b)) | 463 _DistinctStream(Stream<T> source, bool equals(T a, T b)) |
460 : _equals = equals, | 464 : _equals = equals, |
461 super(source); | 465 super(source); |
462 | 466 |
| 467 StreamSubscription<T> _createSubscription(void onData(T data), |
| 468 Function onError, void onDone(), bool cancelOnError) { |
| 469 return new _StateStreamSubscription<T>( |
| 470 this, onData, onError, onDone, cancelOnError, _SENTINEL); |
| 471 } |
| 472 |
463 void _handleData(T inputEvent, _EventSink<T> sink) { | 473 void _handleData(T inputEvent, _EventSink<T> sink) { |
464 if (identical(_previous, _SENTINEL)) { | 474 _StateStreamSubscription<T> subscription = sink; |
465 _previous = inputEvent; | 475 var previous = subscription._value; |
466 return sink._add(inputEvent); | 476 if (identical(previous, _SENTINEL)) { |
| 477 // First event. |
| 478 subscription._value = inputEvent; |
| 479 sink._add(inputEvent); |
467 } else { | 480 } else { |
| 481 T previousEvent = previous; |
468 bool isEqual; | 482 bool isEqual; |
469 try { | 483 try { |
470 if (_equals == null) { | 484 if (_equals == null) { |
471 isEqual = (_previous == inputEvent); | 485 isEqual = (previousEvent == inputEvent); |
472 } else { | 486 } else { |
473 isEqual = _equals(_previous as Object/*=T*/, inputEvent); | 487 isEqual = _equals(previousEvent, inputEvent); |
474 } | 488 } |
475 } catch (e, s) { | 489 } catch (e, s) { |
476 _addErrorWithReplacement(sink, e, s); | 490 _addErrorWithReplacement(sink, e, s); |
477 return null; | 491 return; |
478 } | 492 } |
479 if (!isEqual) { | 493 if (!isEqual) { |
480 sink._add(inputEvent); | 494 sink._add(inputEvent); |
481 _previous = inputEvent; | 495 subscription._value = inputEvent; |
482 } | 496 } |
483 } | 497 } |
484 } | 498 } |
485 } | 499 } |
OLD | NEW |