Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(38)

Side by Side Diff: sdk/lib/async/stream_pipe.dart

Issue 2885993005: Fix Stream.distinct. (Closed)
Patch Set: Created 3 years, 7 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
« no previous file with comments | « sdk/lib/async/stream.dart ('k') | tests/lib/async/stream_distinct_test.dart » ('j') | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
1 // Copyright (c) 2012, the Dart project authors. Please see the AUTHORS file 1 // Copyright (c) 2012, the Dart project authors. Please see the AUTHORS file
2 // for details. All rights reserved. Use of this source code is governed by a 2 // for details. All rights reserved. Use of this source code is governed by a
3 // BSD-style license that can be found in the LICENSE file. 3 // BSD-style license that can be found in the LICENSE file.
4 4
5 part of dart.async; 5 part of dart.async;
6 6
7 /** Runs user code and takes actions depending on success or failure. */ 7 /** Runs user code and takes actions depending on success or failure. */
8 _runUserCode<T>( 8 _runUserCode<T>(
9 T userCode(), onSuccess(T value), onError(error, StackTrace stackTrace)) { 9 T userCode(), onSuccess(T value), onError(error, StackTrace stackTrace)) {
10 try { 10 try {
(...skipping 320 matching lines...) Expand 10 before | Expand all | Expand 10 after
331 // this from source. 331 // this from source.
332 sink._close(); 332 sink._close();
333 } 333 }
334 } 334 }
335 } 335 }
336 } 336 }
337 337
338 /** 338 /**
339 * A [_ForwardingStreamSubscription] with one extra state field. 339 * A [_ForwardingStreamSubscription] with one extra state field.
340 * 340 *
341 * Use by several different classes, some storing an integer, others a bool. 341 * Use by several different classes, storing an integer, bool or general.
342 */ 342 */
343 class _StateStreamSubscription<T> extends _ForwardingStreamSubscription<T, T> { 343 class _StateStreamSubscription<T> extends _ForwardingStreamSubscription<T, T> {
344 // Raw state field. Typed access provided by getters and setters below. 344 // Raw state field. Typed access provided by getters and setters below.
345 var _sharedState; 345 var _sharedState;
346 346
347 _StateStreamSubscription(_ForwardingStream<T, T> stream, void onData(T data), 347 _StateStreamSubscription(_ForwardingStream<T, T> stream, void onData(T data),
348 Function onError, void onDone(), bool cancelOnError, this._sharedState) 348 Function onError, void onDone(), bool cancelOnError, this._sharedState)
349 : super(stream, onData, onError, onDone, cancelOnError); 349 : super(stream, onData, onError, onDone, cancelOnError);
350 350
351 bool get _flag => _sharedState; 351 bool get _flag => _sharedState;
352 void set _flag(bool flag) { 352 void set _flag(bool flag) {
353 _sharedState = flag; 353 _sharedState = flag;
354 } 354 }
355 355
356 int get _count => _sharedState; 356 int get _count => _sharedState;
357 void set _count(int count) { 357 void set _count(int count) {
358 _sharedState = count; 358 _sharedState = count;
359 } 359 }
360
361 Object get _value => _sharedState;
362 void set _value(Object value) {
363 _sharedState = value;
364 }
360 } 365 }
361 366
362 class _TakeWhileStream<T> extends _ForwardingStream<T, T> { 367 class _TakeWhileStream<T> extends _ForwardingStream<T, T> {
363 final _Predicate<T> _test; 368 final _Predicate<T> _test;
364 369
365 _TakeWhileStream(Stream<T> source, bool test(T value)) 370 _TakeWhileStream(Stream<T> source, bool test(T value))
366 : this._test = test, 371 : this._test = test,
367 super(source); 372 super(source);
368 373
369 void _handleData(T inputEvent, _EventSink<T> sink) { 374 void _handleData(T inputEvent, _EventSink<T> sink) {
(...skipping 76 matching lines...) Expand 10 before | Expand all | Expand 10 after
446 sink._add(inputEvent); 451 sink._add(inputEvent);
447 } 452 }
448 } 453 }
449 } 454 }
450 455
451 typedef bool _Equality<T>(T a, T b); 456 typedef bool _Equality<T>(T a, T b);
452 457
453 class _DistinctStream<T> extends _ForwardingStream<T, T> { 458 class _DistinctStream<T> extends _ForwardingStream<T, T> {
454 static var _SENTINEL = new Object(); 459 static var _SENTINEL = new Object();
455 460
456 _Equality<T> _equals; 461 final _Equality<T> _equals;
457 var _previous = _SENTINEL;
458 462
459 _DistinctStream(Stream<T> source, bool equals(T a, T b)) 463 _DistinctStream(Stream<T> source, bool equals(T a, T b))
460 : _equals = equals, 464 : _equals = equals,
461 super(source); 465 super(source);
462 466
467 StreamSubscription<T> _createSubscription(void onData(T data),
468 Function onError, void onDone(), bool cancelOnError) {
469 return new _StateStreamSubscription<T>(
470 this, onData, onError, onDone, cancelOnError, _SENTINEL);
471 }
472
463 void _handleData(T inputEvent, _EventSink<T> sink) { 473 void _handleData(T inputEvent, _EventSink<T> sink) {
464 if (identical(_previous, _SENTINEL)) { 474 _StateStreamSubscription<T> subscription = sink;
465 _previous = inputEvent; 475 var previous = subscription._value;
466 return sink._add(inputEvent); 476 if (identical(previous, _SENTINEL)) {
477 // First event.
478 subscription._value = inputEvent;
479 sink._add(inputEvent);
467 } else { 480 } else {
481 T previousEvent = previous;
468 bool isEqual; 482 bool isEqual;
469 try { 483 try {
470 if (_equals == null) { 484 if (_equals == null) {
471 isEqual = (_previous == inputEvent); 485 isEqual = (previousEvent == inputEvent);
472 } else { 486 } else {
473 isEqual = _equals(_previous as Object/*=T*/, inputEvent); 487 isEqual = _equals(previousEvent, inputEvent);
474 } 488 }
475 } catch (e, s) { 489 } catch (e, s) {
476 _addErrorWithReplacement(sink, e, s); 490 _addErrorWithReplacement(sink, e, s);
477 return null; 491 return;
478 } 492 }
479 if (!isEqual) { 493 if (!isEqual) {
480 sink._add(inputEvent); 494 sink._add(inputEvent);
481 _previous = inputEvent; 495 subscription._value = inputEvent;
482 } 496 }
483 } 497 }
484 } 498 }
485 } 499 }
OLDNEW
« no previous file with comments | « sdk/lib/async/stream.dart ('k') | tests/lib/async/stream_distinct_test.dart » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698