| Index: sdk/lib/async/stream_pipe.dart
|
| diff --git a/sdk/lib/async/stream_pipe.dart b/sdk/lib/async/stream_pipe.dart
|
| index 73ed6b1839fe16d6a5639e60b9c987734e1605c9..6ccb2a67ca23e68820a0bf765622469b2e5ab889 100644
|
| --- a/sdk/lib/async/stream_pipe.dart
|
| +++ b/sdk/lib/async/stream_pipe.dart
|
| @@ -338,7 +338,7 @@ class _TakeStream<T> extends _ForwardingStream<T, T> {
|
| /**
|
| * A [_ForwardingStreamSubscription] with one extra state field.
|
| *
|
| - * Use by several different classes, some storing an integer, others a bool.
|
| + * Use by several different classes, storing an integer, bool or general.
|
| */
|
| class _StateStreamSubscription<T> extends _ForwardingStreamSubscription<T, T> {
|
| // Raw state field. Typed access provided by getters and setters below.
|
| @@ -357,6 +357,11 @@ class _StateStreamSubscription<T> extends _ForwardingStreamSubscription<T, T> {
|
| void set _count(int count) {
|
| _sharedState = count;
|
| }
|
| +
|
| + Object get _value => _sharedState;
|
| + void set _value(Object value) {
|
| + _sharedState = value;
|
| + }
|
| }
|
|
|
| class _TakeWhileStream<T> extends _ForwardingStream<T, T> {
|
| @@ -453,32 +458,41 @@ typedef bool _Equality<T>(T a, T b);
|
| class _DistinctStream<T> extends _ForwardingStream<T, T> {
|
| static var _SENTINEL = new Object();
|
|
|
| - _Equality<T> _equals;
|
| - var _previous = _SENTINEL;
|
| + final _Equality<T> _equals;
|
|
|
| _DistinctStream(Stream<T> source, bool equals(T a, T b))
|
| : _equals = equals,
|
| super(source);
|
|
|
| + StreamSubscription<T> _createSubscription(void onData(T data),
|
| + Function onError, void onDone(), bool cancelOnError) {
|
| + return new _StateStreamSubscription<T>(
|
| + this, onData, onError, onDone, cancelOnError, _SENTINEL);
|
| + }
|
| +
|
| void _handleData(T inputEvent, _EventSink<T> sink) {
|
| - if (identical(_previous, _SENTINEL)) {
|
| - _previous = inputEvent;
|
| - return sink._add(inputEvent);
|
| + _StateStreamSubscription<T> subscription = sink;
|
| + var previous = subscription._value;
|
| + if (identical(previous, _SENTINEL)) {
|
| + // First event.
|
| + subscription._value = inputEvent;
|
| + sink._add(inputEvent);
|
| } else {
|
| + T previousEvent = previous;
|
| bool isEqual;
|
| try {
|
| if (_equals == null) {
|
| - isEqual = (_previous == inputEvent);
|
| + isEqual = (previousEvent == inputEvent);
|
| } else {
|
| - isEqual = _equals(_previous as Object/*=T*/, inputEvent);
|
| + isEqual = _equals(previousEvent, inputEvent);
|
| }
|
| } catch (e, s) {
|
| _addErrorWithReplacement(sink, e, s);
|
| - return null;
|
| + return;
|
| }
|
| if (!isEqual) {
|
| sink._add(inputEvent);
|
| - _previous = inputEvent;
|
| + subscription._value = inputEvent;
|
| }
|
| }
|
| }
|
|
|