OLD | NEW |
---|---|
1 // Copyright (c) 2013, the Dart project authors. Please see the AUTHORS file | 1 // Copyright (c) 2013, the Dart project authors. Please see the AUTHORS file |
2 // for details. All rights reserved. Use of this source code is governed by a | 2 // for details. All rights reserved. Use of this source code is governed by a |
3 // BSD-style license that can be found in the LICENSE file. | 3 // BSD-style license that can be found in the LICENSE file. |
4 | 4 |
5 part of dart.async; | 5 part of dart.async; |
6 | 6 |
7 // ------------------------------------------------------------------- | 7 // ------------------------------------------------------------------- |
8 // Core Stream types | 8 // Core Stream types |
9 // ------------------------------------------------------------------- | 9 // ------------------------------------------------------------------- |
10 | 10 |
(...skipping 156 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
167 * Adds a subscription to this stream. | 167 * Adds a subscription to this stream. |
168 * | 168 * |
169 * On each data event from this stream, the subscriber's [onData] handler | 169 * On each data event from this stream, the subscriber's [onData] handler |
170 * is called. If [onData] is null, nothing happens. | 170 * is called. If [onData] is null, nothing happens. |
171 * | 171 * |
172 * On errors from this stream, the [onError] handler is given a | 172 * On errors from this stream, the [onError] handler is given a |
173 * [AsyncError] object describing the error. | 173 * [AsyncError] object describing the error. |
174 * | 174 * |
175 * If this stream closes, the [onDone] handler is called. | 175 * If this stream closes, the [onDone] handler is called. |
176 * | 176 * |
177 * If [unsubscribeOnError] is true, the subscription is ended when | 177 * If [cancelOnError] is true, the subscription is ended when |
178 * the first error is reported. The default is false. | 178 * the first error is reported. The default is false. |
179 */ | 179 */ |
180 StreamSubscription<T> listen(void onData(T event), | 180 StreamSubscription<T> listen(void onData(T event), |
181 { void onError(AsyncError error), | 181 { void onError(AsyncError error), |
182 void onDone(), | 182 void onDone(), |
183 bool unsubscribeOnError}); | 183 bool cancelOnError}); |
184 | 184 |
185 /** | 185 /** |
186 * Creates a new stream from this stream that discards some data events. | 186 * Creates a new stream from this stream that discards some data events. |
187 * | 187 * |
188 * The new stream sends the same error and done events as this stream, | 188 * The new stream sends the same error and done events as this stream, |
189 * but it only sends the data events that satisfy the [test]. | 189 * but it only sends the data events that satisfy the [test]. |
190 */ | 190 */ |
191 Stream<T> where(bool test(T event)) { | 191 Stream<T> where(bool test(T event)) { |
192 return new _WhereStream<T>(this, test); | 192 return new _WhereStream<T>(this, test); |
193 } | 193 } |
(...skipping 77 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
271 } | 271 } |
272 }, | 272 }, |
273 onError: result._setError, | 273 onError: result._setError, |
274 onDone: () { | 274 onDone: () { |
275 if (!seenFirst) { | 275 if (!seenFirst) { |
276 result._setError(new AsyncError(new StateError("No elements"))); | 276 result._setError(new AsyncError(new StateError("No elements"))); |
277 } else { | 277 } else { |
278 result._setValue(value); | 278 result._setValue(value); |
279 } | 279 } |
280 }, | 280 }, |
281 unsubscribeOnError: true | 281 cancelOnError: true |
282 ); | 282 ); |
283 return result; | 283 return result; |
284 } | 284 } |
285 | 285 |
286 /** Reduces a sequence of values by repeatedly applying [combine]. */ | 286 /** Reduces a sequence of values by repeatedly applying [combine]. */ |
287 Future fold(var initialValue, combine(var previous, T element)) { | 287 Future fold(var initialValue, combine(var previous, T element)) { |
288 _FutureImpl result = new _FutureImpl(); | 288 _FutureImpl result = new _FutureImpl(); |
289 var value = initialValue; | 289 var value = initialValue; |
290 StreamSubscription subscription; | 290 StreamSubscription subscription; |
291 subscription = this.listen( | 291 subscription = this.listen( |
292 // TODO(ahe): Restore type when feature is implemented in dart2js | 292 // TODO(ahe): Restore type when feature is implemented in dart2js |
293 // checked mode. http://dartbug.com/7733 | 293 // checked mode. http://dartbug.com/7733 |
294 (/*T*/ element) { | 294 (/*T*/ element) { |
295 _runUserCode( | 295 _runUserCode( |
296 () => combine(value, element), | 296 () => combine(value, element), |
297 (newValue) { value = newValue; }, | 297 (newValue) { value = newValue; }, |
298 _cancelAndError(subscription, result) | 298 _cancelAndError(subscription, result) |
299 ); | 299 ); |
300 }, | 300 }, |
301 onError: (AsyncError e) { | 301 onError: (AsyncError e) { |
302 result._setError(e); | 302 result._setError(e); |
303 }, | 303 }, |
304 onDone: () { | 304 onDone: () { |
305 result._setValue(value); | 305 result._setValue(value); |
306 }, | 306 }, |
307 unsubscribeOnError: true); | 307 cancelOnError: true); |
308 return result; | 308 return result; |
309 } | 309 } |
310 | 310 |
311 /** | 311 /** |
312 * Checks whether [match] occurs in the elements provided by this stream. | 312 * Checks whether [match] occurs in the elements provided by this stream. |
313 * | 313 * |
314 * Completes the [Future] when the answer is known. | 314 * Completes the [Future] when the answer is known. |
315 * If this stream reports an error, the [Future] will report that error. | 315 * If this stream reports an error, the [Future] will report that error. |
316 */ | 316 */ |
317 Future<bool> contains(T match) { | 317 Future<bool> contains(T match) { |
(...skipping 11 matching lines...) Expand all Loading... | |
329 future._setValue(true); | 329 future._setValue(true); |
330 } | 330 } |
331 }, | 331 }, |
332 _cancelAndError(subscription, future) | 332 _cancelAndError(subscription, future) |
333 ); | 333 ); |
334 }, | 334 }, |
335 onError: future._setError, | 335 onError: future._setError, |
336 onDone: () { | 336 onDone: () { |
337 future._setValue(false); | 337 future._setValue(false); |
338 }, | 338 }, |
339 unsubscribeOnError: true); | 339 cancelOnError: true); |
340 return future; | 340 return future; |
341 } | 341 } |
342 | 342 |
343 /** | 343 /** |
344 * Checks whether [test] accepts all elements provided by this stream. | 344 * Checks whether [test] accepts all elements provided by this stream. |
345 * | 345 * |
346 * Completes the [Future] when the answer is known. | 346 * Completes the [Future] when the answer is known. |
347 * If this stream reports an error, the [Future] will report that error. | 347 * If this stream reports an error, the [Future] will report that error. |
348 */ | 348 */ |
349 Future<bool> every(bool test(T element)) { | 349 Future<bool> every(bool test(T element)) { |
(...skipping 11 matching lines...) Expand all Loading... | |
361 future._setValue(false); | 361 future._setValue(false); |
362 } | 362 } |
363 }, | 363 }, |
364 _cancelAndError(subscription, future) | 364 _cancelAndError(subscription, future) |
365 ); | 365 ); |
366 }, | 366 }, |
367 onError: future._setError, | 367 onError: future._setError, |
368 onDone: () { | 368 onDone: () { |
369 future._setValue(true); | 369 future._setValue(true); |
370 }, | 370 }, |
371 unsubscribeOnError: true); | 371 cancelOnError: true); |
372 return future; | 372 return future; |
373 } | 373 } |
374 | 374 |
375 /** | 375 /** |
376 * Checks whether [test] accepts any element provided by this stream. | 376 * Checks whether [test] accepts any element provided by this stream. |
377 * | 377 * |
378 * Completes the [Future] when the answer is known. | 378 * Completes the [Future] when the answer is known. |
379 * If this stream reports an error, the [Future] will report that error. | 379 * If this stream reports an error, the [Future] will report that error. |
380 */ | 380 */ |
381 Future<bool> any(bool test(T element)) { | 381 Future<bool> any(bool test(T element)) { |
(...skipping 11 matching lines...) Expand all Loading... | |
393 future._setValue(true); | 393 future._setValue(true); |
394 } | 394 } |
395 }, | 395 }, |
396 _cancelAndError(subscription, future) | 396 _cancelAndError(subscription, future) |
397 ); | 397 ); |
398 }, | 398 }, |
399 onError: future._setError, | 399 onError: future._setError, |
400 onDone: () { | 400 onDone: () { |
401 future._setValue(false); | 401 future._setValue(false); |
402 }, | 402 }, |
403 unsubscribeOnError: true); | 403 cancelOnError: true); |
404 return future; | 404 return future; |
405 } | 405 } |
406 | 406 |
407 | 407 |
408 /** Counts the elements in the stream. */ | 408 /** Counts the elements in the stream. */ |
409 Future<int> get length { | 409 Future<int> get length { |
410 _FutureImpl<int> future = new _FutureImpl<int>(); | 410 _FutureImpl<int> future = new _FutureImpl<int>(); |
411 int count = 0; | 411 int count = 0; |
412 this.listen( | 412 this.listen( |
413 (_) { count++; }, | 413 (_) { count++; }, |
414 onError: future._setError, | 414 onError: future._setError, |
415 onDone: () { | 415 onDone: () { |
416 future._setValue(count); | 416 future._setValue(count); |
417 }, | 417 }, |
418 unsubscribeOnError: true); | 418 cancelOnError: true); |
419 return future; | 419 return future; |
420 } | 420 } |
421 | 421 |
422 /** Reports whether this stream contains any elements. */ | 422 /** Reports whether this stream contains any elements. */ |
423 Future<bool> get isEmpty { | 423 Future<bool> get isEmpty { |
424 _FutureImpl<bool> future = new _FutureImpl<bool>(); | 424 _FutureImpl<bool> future = new _FutureImpl<bool>(); |
425 StreamSubscription subscription; | 425 StreamSubscription subscription; |
426 subscription = this.listen( | 426 subscription = this.listen( |
427 (_) { | 427 (_) { |
428 subscription.cancel(); | 428 subscription.cancel(); |
429 future._setValue(false); | 429 future._setValue(false); |
430 }, | 430 }, |
431 onError: future._setError, | 431 onError: future._setError, |
432 onDone: () { | 432 onDone: () { |
433 future._setValue(true); | 433 future._setValue(true); |
434 }, | 434 }, |
435 unsubscribeOnError: true); | 435 cancelOnError: true); |
436 return future; | 436 return future; |
437 } | 437 } |
438 | 438 |
439 /** Collects the data of this stream in a [List]. */ | 439 /** Collects the data of this stream in a [List]. */ |
440 Future<List<T>> toList() { | 440 Future<List<T>> toList() { |
441 List<T> result = <T>[]; | 441 List<T> result = <T>[]; |
442 _FutureImpl<List<T>> future = new _FutureImpl<List<T>>(); | 442 _FutureImpl<List<T>> future = new _FutureImpl<List<T>>(); |
443 this.listen( | 443 this.listen( |
444 // TODO(ahe): Restore type when feature is implemented in dart2js | 444 // TODO(ahe): Restore type when feature is implemented in dart2js |
445 // checked mode. http://dartbug.com/7733 | 445 // checked mode. http://dartbug.com/7733 |
446 (/*T*/ data) { | 446 (/*T*/ data) { |
447 result.add(data); | 447 result.add(data); |
448 }, | 448 }, |
449 onError: future._setError, | 449 onError: future._setError, |
450 onDone: () { | 450 onDone: () { |
451 future._setValue(result); | 451 future._setValue(result); |
452 }, | 452 }, |
453 unsubscribeOnError: true); | 453 cancelOnError: true); |
454 return future; | 454 return future; |
455 } | 455 } |
456 | 456 |
457 /** Collects the data of this stream in a [Set]. */ | 457 /** Collects the data of this stream in a [Set]. */ |
458 Future<Set<T>> toSet() { | 458 Future<Set<T>> toSet() { |
459 Set<T> result = new Set<T>(); | 459 Set<T> result = new Set<T>(); |
460 _FutureImpl<Set<T>> future = new _FutureImpl<Set<T>>(); | 460 _FutureImpl<Set<T>> future = new _FutureImpl<Set<T>>(); |
461 this.listen( | 461 this.listen( |
462 // TODO(ahe): Restore type when feature is implemented in dart2js | 462 // TODO(ahe): Restore type when feature is implemented in dart2js |
463 // checked mode. http://dartbug.com/7733 | 463 // checked mode. http://dartbug.com/7733 |
464 (/*T*/ data) { | 464 (/*T*/ data) { |
465 result.add(data); | 465 result.add(data); |
466 }, | 466 }, |
467 onError: future._setError, | 467 onError: future._setError, |
468 onDone: () { | 468 onDone: () { |
469 future._setValue(result); | 469 future._setValue(result); |
470 }, | 470 }, |
471 unsubscribeOnError: true); | 471 cancelOnError: true); |
472 return future; | 472 return future; |
473 } | 473 } |
474 | 474 |
475 /** | 475 /** |
476 * Provides at most the first [n] values of this stream. | 476 * Provides at most the first [n] values of this stream. |
477 * | 477 * |
478 * Forwards the first [n] data events of this stream, and all error | 478 * Forwards the first [n] data events of this stream, and all error |
479 * events, to the returned stream, and ends with a done event. | 479 * events, to the returned stream, and ends with a done event. |
480 * | 480 * |
481 * If this stream produces fewer than [count] values before it's done, | 481 * If this stream produces fewer than [count] values before it's done, |
(...skipping 61 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
543 // checked mode. http://dartbug.com/7733 | 543 // checked mode. http://dartbug.com/7733 |
544 (/*T*/ value) { | 544 (/*T*/ value) { |
545 subscription.cancel(); | 545 subscription.cancel(); |
546 future._setValue(value); | 546 future._setValue(value); |
547 return; | 547 return; |
548 }, | 548 }, |
549 onError: future._setError, | 549 onError: future._setError, |
550 onDone: () { | 550 onDone: () { |
551 future._setError(new AsyncError(new StateError("No elements"))); | 551 future._setError(new AsyncError(new StateError("No elements"))); |
552 }, | 552 }, |
553 unsubscribeOnError: true); | 553 cancelOnError: true); |
554 return future; | 554 return future; |
555 } | 555 } |
556 | 556 |
557 /** | 557 /** |
558 * Returns the last element. | 558 * Returns the last element. |
559 * | 559 * |
560 * If [this] is empty throws a [StateError]. | 560 * If [this] is empty throws a [StateError]. |
561 */ | 561 */ |
562 Future<T> get last { | 562 Future<T> get last { |
563 _FutureImpl<T> future = new _FutureImpl<T>(); | 563 _FutureImpl<T> future = new _FutureImpl<T>(); |
564 T result = null; | 564 T result = null; |
565 bool foundResult = false; | 565 bool foundResult = false; |
566 StreamSubscription subscription; | 566 StreamSubscription subscription; |
567 subscription = this.listen( | 567 subscription = this.listen( |
568 // TODO(ahe): Restore type when feature is implemented in dart2js | 568 // TODO(ahe): Restore type when feature is implemented in dart2js |
569 // checked mode. http://dartbug.com/7733 | 569 // checked mode. http://dartbug.com/7733 |
570 (/*T*/ value) { | 570 (/*T*/ value) { |
571 foundResult = true; | 571 foundResult = true; |
572 result = value; | 572 result = value; |
573 }, | 573 }, |
574 onError: future._setError, | 574 onError: future._setError, |
575 onDone: () { | 575 onDone: () { |
576 if (foundResult) { | 576 if (foundResult) { |
577 future._setValue(result); | 577 future._setValue(result); |
578 return; | 578 return; |
579 } | 579 } |
580 future._setError(new AsyncError(new StateError("No elements"))); | 580 future._setError(new AsyncError(new StateError("No elements"))); |
581 }, | 581 }, |
582 unsubscribeOnError: true); | 582 cancelOnError: true); |
583 return future; | 583 return future; |
584 } | 584 } |
585 | 585 |
586 /** | 586 /** |
587 * Returns the single element. | 587 * Returns the single element. |
588 * | 588 * |
589 * If [this] is empty or has more than one element throws a [StateError]. | 589 * If [this] is empty or has more than one element throws a [StateError]. |
590 */ | 590 */ |
591 Future<T> get single { | 591 Future<T> get single { |
592 _FutureImpl<T> future = new _FutureImpl<T>(); | 592 _FutureImpl<T> future = new _FutureImpl<T>(); |
(...skipping 15 matching lines...) Expand all Loading... | |
608 result = value; | 608 result = value; |
609 }, | 609 }, |
610 onError: future._setError, | 610 onError: future._setError, |
611 onDone: () { | 611 onDone: () { |
612 if (foundResult) { | 612 if (foundResult) { |
613 future._setValue(result); | 613 future._setValue(result); |
614 return; | 614 return; |
615 } | 615 } |
616 future._setError(new AsyncError(new StateError("No elements"))); | 616 future._setError(new AsyncError(new StateError("No elements"))); |
617 }, | 617 }, |
618 unsubscribeOnError: true); | 618 cancelOnError: true); |
619 return future; | 619 return future; |
620 } | 620 } |
621 | 621 |
622 /** | 622 /** |
623 * Finds the first element of this stream matching [test]. | 623 * Finds the first element of this stream matching [test]. |
624 * | 624 * |
625 * Returns a future that is filled with the first element of this stream | 625 * Returns a future that is filled with the first element of this stream |
626 * that [test] returns true for. | 626 * that [test] returns true for. |
627 * | 627 * |
628 * If no such element is found before this stream is done, and a | 628 * If no such element is found before this stream is done, and a |
(...skipping 24 matching lines...) Expand all Loading... | |
653 }, | 653 }, |
654 onError: future._setError, | 654 onError: future._setError, |
655 onDone: () { | 655 onDone: () { |
656 if (defaultValue != null) { | 656 if (defaultValue != null) { |
657 _runUserCode(defaultValue, future._setValue, future._setError); | 657 _runUserCode(defaultValue, future._setValue, future._setError); |
658 return; | 658 return; |
659 } | 659 } |
660 future._setError( | 660 future._setError( |
661 new AsyncError(new StateError("firstMatch ended without match"))); | 661 new AsyncError(new StateError("firstMatch ended without match"))); |
662 }, | 662 }, |
663 unsubscribeOnError: true); | 663 cancelOnError: true); |
664 return future; | 664 return future; |
665 } | 665 } |
666 | 666 |
667 /** | 667 /** |
668 * Finds the last element in this stream matching [test]. | 668 * Finds the last element in this stream matching [test]. |
669 * | 669 * |
670 * As [firstWhere], except that the last matching element is found. | 670 * As [firstWhere], except that the last matching element is found. |
671 * That means that the result cannot be provided before this stream | 671 * That means that the result cannot be provided before this stream |
672 * is done. | 672 * is done. |
673 */ | 673 */ |
(...skipping 23 matching lines...) Expand all Loading... | |
697 future._setValue(result); | 697 future._setValue(result); |
698 return; | 698 return; |
699 } | 699 } |
700 if (defaultValue != null) { | 700 if (defaultValue != null) { |
701 _runUserCode(defaultValue, future._setValue, future._setError); | 701 _runUserCode(defaultValue, future._setValue, future._setError); |
702 return; | 702 return; |
703 } | 703 } |
704 future._setError( | 704 future._setError( |
705 new AsyncError(new StateError("lastMatch ended without match"))); | 705 new AsyncError(new StateError("lastMatch ended without match"))); |
706 }, | 706 }, |
707 unsubscribeOnError: true); | 707 cancelOnError: true); |
708 return future; | 708 return future; |
709 } | 709 } |
710 | 710 |
711 /** | 711 /** |
712 * Finds the single element in this stream matching [test]. | 712 * Finds the single element in this stream matching [test]. |
713 * | 713 * |
714 * Like [lastMatch], except that it is an error if more than one | 714 * Like [lastMatch], except that it is an error if more than one |
715 * matching element occurs in the stream. | 715 * matching element occurs in the stream. |
716 */ | 716 */ |
717 Future<T> singleWhere(bool test(T value)) { | 717 Future<T> singleWhere(bool test(T value)) { |
(...skipping 24 matching lines...) Expand all Loading... | |
742 }, | 742 }, |
743 onError: future._setError, | 743 onError: future._setError, |
744 onDone: () { | 744 onDone: () { |
745 if (foundResult) { | 745 if (foundResult) { |
746 future._setValue(result); | 746 future._setValue(result); |
747 return; | 747 return; |
748 } | 748 } |
749 future._setError( | 749 future._setError( |
750 new AsyncError(new StateError("single ended without match"))); | 750 new AsyncError(new StateError("single ended without match"))); |
751 }, | 751 }, |
752 unsubscribeOnError: true); | 752 cancelOnError: true); |
753 return future; | 753 return future; |
754 } | 754 } |
755 | 755 |
756 /** | 756 /** |
757 * Returns the value of the [index]th data event of this stream. | 757 * Returns the value of the [index]th data event of this stream. |
758 * | 758 * |
759 * If an error event occurs, the future will end with this error. | 759 * If an error event occurs, the future will end with this error. |
760 * | 760 * |
761 * If this stream provides fewer than [index] elements before closing, | 761 * If this stream provides fewer than [index] elements before closing, |
762 * an error is reported. | 762 * an error is reported. |
(...skipping 11 matching lines...) Expand all Loading... | |
774 future._setValue(value); | 774 future._setValue(value); |
775 return; | 775 return; |
776 } | 776 } |
777 index -= 1; | 777 index -= 1; |
778 }, | 778 }, |
779 onError: future._setError, | 779 onError: future._setError, |
780 onDone: () { | 780 onDone: () { |
781 future._setError(new AsyncError( | 781 future._setError(new AsyncError( |
782 new StateError("Not enough elements for elementAt"))); | 782 new StateError("Not enough elements for elementAt"))); |
783 }, | 783 }, |
784 unsubscribeOnError: true); | 784 cancelOnError: true); |
785 return future; | 785 return future; |
786 } | 786 } |
787 } | 787 } |
788 | 788 |
789 /** | 789 /** |
790 * A control object for the subscription on a [Stream]. | 790 * A control object for the subscription on a [Stream]. |
791 * | 791 * |
792 * When you subscribe on a [Stream] using [Stream.listen], | 792 * When you subscribe on a [Stream] using [Stream.listen], |
793 * a [StreamSubscription] object is returned. This object | 793 * a [StreamSubscription] object is returned. This object |
794 * is used to later unsubscribe again, or to temporarily pause | 794 * is used to later unsubscribe again, or to temporarily pause |
(...skipping 36 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
831 */ | 831 */ |
832 void resume(); | 832 void resume(); |
833 | 833 |
834 /** | 834 /** |
835 * Returns a future that handles the [onDone] and [onError] callbacks. | 835 * Returns a future that handles the [onDone] and [onError] callbacks. |
836 * | 836 * |
837 * This method *overwrites* the existing [onDone] and [onError] callbacks | 837 * This method *overwrites* the existing [onDone] and [onError] callbacks |
838 * with new ones that complete the returned future. | 838 * with new ones that complete the returned future. |
839 * | 839 * |
840 * In case of an error the subscription will automatically cancel (even | 840 * In case of an error the subscription will automatically cancel (even |
841 * when it was listening with `unsubscribeOnError` set to `false`). | 841 * when it was listening with `cancelOnError` set to `false`). |
842 * | 842 * |
843 * In case of a `done` event the future completes with the given | 843 * In case of a `done` event the future completes with the given |
844 * [futureValue]. | 844 * [futureValue]. |
845 */ | 845 */ |
846 Future asFuture([var futureValue]); | 846 Future asFuture([var futureValue]); |
847 } | 847 } |
848 | 848 |
849 | 849 |
850 /** | 850 /** |
851 * An interface that abstracts creation or handling of [Stream] events. | 851 * An interface that abstracts creation or handling of [Stream] events. |
(...skipping 14 matching lines...) Expand all Loading... | |
866 | 866 |
867 StreamView(this._stream); | 867 StreamView(this._stream); |
868 | 868 |
869 bool get isBroadcast => _stream.isBroadcast; | 869 bool get isBroadcast => _stream.isBroadcast; |
870 | 870 |
871 Stream<T> asBroadcastStream() => _stream.asBroadcastStream(); | 871 Stream<T> asBroadcastStream() => _stream.asBroadcastStream(); |
872 | 872 |
873 StreamSubscription<T> listen(void onData(T value), | 873 StreamSubscription<T> listen(void onData(T value), |
874 { void onError(AsyncError error), | 874 { void onError(AsyncError error), |
875 void onDone(), | 875 void onDone(), |
876 bool unsubscribeOnError }) { | 876 bool cancelOnError }) { |
877 return _stream.listen(onData, onError: onError, onDone: onDone, | 877 return _stream.listen(onData, onError: onError, onDone: onDone, |
878 unsubscribeOnError: unsubscribeOnError); | 878 cancelOnError: cancelOnError); |
879 } | 879 } |
880 } | 880 } |
881 | 881 |
882 /** | 882 /** |
883 * [EventSink] wrapper that only exposes the [EventSink] interface. | 883 * [EventSink] wrapper that only exposes the [EventSink] interface. |
884 */ | 884 */ |
885 class _EventSinkView<T> extends EventSink<T> { | 885 class _EventSinkView<T> extends EventSink<T> { |
886 final EventSink<T> _sink; | 886 final EventSink<T> _sink; |
887 | 887 |
888 _EventSinkView(this._sink); | 888 _EventSinkView(this._sink); |
(...skipping 189 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
1078 class EventTransformStream<S, T> extends Stream<T> { | 1078 class EventTransformStream<S, T> extends Stream<T> { |
1079 final Stream<S> _source; | 1079 final Stream<S> _source; |
1080 final StreamEventTransformer _transformer; | 1080 final StreamEventTransformer _transformer; |
1081 EventTransformStream(Stream<S> source, | 1081 EventTransformStream(Stream<S> source, |
1082 StreamEventTransformer<S, T> transformer) | 1082 StreamEventTransformer<S, T> transformer) |
1083 : _source = source, _transformer = transformer; | 1083 : _source = source, _transformer = transformer; |
1084 | 1084 |
1085 StreamSubscription<T> listen(void onData(T data), | 1085 StreamSubscription<T> listen(void onData(T data), |
1086 { void onError(AsyncError error), | 1086 { void onError(AsyncError error), |
1087 void onDone(), | 1087 void onDone(), |
1088 bool unsubscribeOnError }) { | 1088 bool cancelOnError }) { |
1089 unsubscribeOnError = identical(true, unsubscribeOnError); | 1089 cancelOnError = identical(true, cancelOnError); |
1090 return new _EventTransformStreamSubscription(_source, _transformer, | 1090 return new _EventTransformStreamSubscription(_source, _transformer, |
1091 onData, onError, onDone, | 1091 onData, onError, onDone, |
1092 unsubscribeOnError); | 1092 cancelOnError); |
1093 } | 1093 } |
1094 } | 1094 } |
1095 | 1095 |
1096 class _EventTransformStreamSubscription<S, T> | 1096 class _EventTransformStreamSubscription<S, T> |
1097 extends _BaseStreamSubscription<T> | 1097 extends _BaseStreamSubscription<T> |
1098 implements _EventOutputSink<T> { | 1098 implements _EventOutputSink<T> { |
1099 /** The transformer used to transform events. */ | 1099 /** The transformer used to transform events. */ |
1100 final StreamEventTransformer<S, T> _transformer; | 1100 final StreamEventTransformer<S, T> _transformer; |
1101 /** Whether to unsubscribe when emitting an error. */ | 1101 /** Whether to unsubscribe when emitting an error. */ |
Lasse Reichstein Nielsen
2013/04/15 16:11:03
unsubscribe -> cancel :)
floitsch
2013/04/15 16:13:52
Please file a bug. I will do it later. ;P
| |
1102 final bool _unsubscribeOnError; | 1102 final bool _cancelOnError; |
1103 /** Whether this stream has sent a done event. */ | 1103 /** Whether this stream has sent a done event. */ |
1104 bool _isClosed = false; | 1104 bool _isClosed = false; |
1105 /** Source of incoming events. */ | 1105 /** Source of incoming events. */ |
1106 StreamSubscription<S> _subscription; | 1106 StreamSubscription<S> _subscription; |
1107 /** Cached EventSink wrapper for this class. */ | 1107 /** Cached EventSink wrapper for this class. */ |
1108 EventSink<T> _sink; | 1108 EventSink<T> _sink; |
1109 | 1109 |
1110 _EventTransformStreamSubscription(Stream<S> source, | 1110 _EventTransformStreamSubscription(Stream<S> source, |
1111 this._transformer, | 1111 this._transformer, |
1112 void onData(T data), | 1112 void onData(T data), |
1113 void onError(AsyncError error), | 1113 void onError(AsyncError error), |
1114 void onDone(), | 1114 void onDone(), |
1115 this._unsubscribeOnError) | 1115 this._cancelOnError) |
1116 : super(onData, onError, onDone) { | 1116 : super(onData, onError, onDone) { |
1117 _sink = new _EventOutputSinkWrapper<T>(this); | 1117 _sink = new _EventOutputSinkWrapper<T>(this); |
1118 _subscription = source.listen(_handleData, | 1118 _subscription = source.listen(_handleData, |
1119 onError: _handleError, | 1119 onError: _handleError, |
1120 onDone: _handleDone); | 1120 onDone: _handleDone); |
1121 } | 1121 } |
1122 | 1122 |
1123 /** Whether this subscription is still subscribed to its source. */ | 1123 /** Whether this subscription is still subscribed to its source. */ |
1124 bool get _isSubscribed => _subscription != null; | 1124 bool get _isSubscribed => _subscription != null; |
1125 | 1125 |
(...skipping 41 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
1167 | 1167 |
1168 // EventOutputSink interface. | 1168 // EventOutputSink interface. |
1169 void _sendData(T data) { | 1169 void _sendData(T data) { |
1170 if (_isClosed) return; | 1170 if (_isClosed) return; |
1171 _onData(data); | 1171 _onData(data); |
1172 } | 1172 } |
1173 | 1173 |
1174 void _sendError(AsyncError error) { | 1174 void _sendError(AsyncError error) { |
1175 if (_isClosed) return; | 1175 if (_isClosed) return; |
1176 _onError(error); | 1176 _onError(error); |
1177 if (_unsubscribeOnError) { | 1177 if (_cancelOnError) { |
1178 cancel(); | 1178 cancel(); |
1179 } | 1179 } |
1180 } | 1180 } |
1181 | 1181 |
1182 void _sendDone() { | 1182 void _sendDone() { |
1183 if (_isClosed) throw new StateError("Already closed."); | 1183 if (_isClosed) throw new StateError("Already closed."); |
1184 _isClosed = true; | 1184 _isClosed = true; |
1185 if (_isSubscribed) { | 1185 if (_isSubscribed) { |
1186 _subscription.cancel(); | 1186 _subscription.cancel(); |
1187 _subscription = null; | 1187 _subscription = null; |
1188 } | 1188 } |
1189 _onDone(); | 1189 _onDone(); |
1190 } | 1190 } |
1191 } | 1191 } |
1192 | 1192 |
1193 class _EventOutputSinkWrapper<T> extends EventSink<T> { | 1193 class _EventOutputSinkWrapper<T> extends EventSink<T> { |
1194 _EventOutputSink _sink; | 1194 _EventOutputSink _sink; |
1195 _EventOutputSinkWrapper(this._sink); | 1195 _EventOutputSinkWrapper(this._sink); |
1196 | 1196 |
1197 void add(T data) { _sink._sendData(data); } | 1197 void add(T data) { _sink._sendData(data); } |
1198 void addError(AsyncError error) { _sink._sendError(error); } | 1198 void addError(AsyncError error) { _sink._sendError(error); } |
1199 void close() { _sink._sendDone(); } | 1199 void close() { _sink._sendDone(); } |
1200 } | 1200 } |
OLD | NEW |