Chromium Code Reviews| OLD | NEW |
|---|---|
| 1 // Copyright (c) 2013, the Dart project authors. Please see the AUTHORS file | 1 // Copyright (c) 2013, the Dart project authors. Please see the AUTHORS file |
| 2 // for details. All rights reserved. Use of this source code is governed by a | 2 // for details. All rights reserved. Use of this source code is governed by a |
| 3 // BSD-style license that can be found in the LICENSE file. | 3 // BSD-style license that can be found in the LICENSE file. |
| 4 | 4 |
| 5 part of dart.async; | 5 part of dart.async; |
| 6 | 6 |
| 7 // ------------------------------------------------------------------- | 7 // ------------------------------------------------------------------- |
| 8 // Core Stream types | 8 // Core Stream types |
| 9 // ------------------------------------------------------------------- | 9 // ------------------------------------------------------------------- |
| 10 | 10 |
| (...skipping 156 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
| 167 * Adds a subscription to this stream. | 167 * Adds a subscription to this stream. |
| 168 * | 168 * |
| 169 * On each data event from this stream, the subscriber's [onData] handler | 169 * On each data event from this stream, the subscriber's [onData] handler |
| 170 * is called. If [onData] is null, nothing happens. | 170 * is called. If [onData] is null, nothing happens. |
| 171 * | 171 * |
| 172 * On errors from this stream, the [onError] handler is given a | 172 * On errors from this stream, the [onError] handler is given a |
| 173 * [AsyncError] object describing the error. | 173 * [AsyncError] object describing the error. |
| 174 * | 174 * |
| 175 * If this stream closes, the [onDone] handler is called. | 175 * If this stream closes, the [onDone] handler is called. |
| 176 * | 176 * |
| 177 * If [unsubscribeOnError] is true, the subscription is ended when | 177 * If [cancelOnError] is true, the subscription is ended when |
| 178 * the first error is reported. The default is false. | 178 * the first error is reported. The default is false. |
| 179 */ | 179 */ |
| 180 StreamSubscription<T> listen(void onData(T event), | 180 StreamSubscription<T> listen(void onData(T event), |
| 181 { void onError(AsyncError error), | 181 { void onError(AsyncError error), |
| 182 void onDone(), | 182 void onDone(), |
| 183 bool unsubscribeOnError}); | 183 bool cancelOnError}); |
| 184 | 184 |
| 185 /** | 185 /** |
| 186 * Creates a new stream from this stream that discards some data events. | 186 * Creates a new stream from this stream that discards some data events. |
| 187 * | 187 * |
| 188 * The new stream sends the same error and done events as this stream, | 188 * The new stream sends the same error and done events as this stream, |
| 189 * but it only sends the data events that satisfy the [test]. | 189 * but it only sends the data events that satisfy the [test]. |
| 190 */ | 190 */ |
| 191 Stream<T> where(bool test(T event)) { | 191 Stream<T> where(bool test(T event)) { |
| 192 return new _WhereStream<T>(this, test); | 192 return new _WhereStream<T>(this, test); |
| 193 } | 193 } |
| (...skipping 77 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
| 271 } | 271 } |
| 272 }, | 272 }, |
| 273 onError: result._setError, | 273 onError: result._setError, |
| 274 onDone: () { | 274 onDone: () { |
| 275 if (!seenFirst) { | 275 if (!seenFirst) { |
| 276 result._setError(new AsyncError(new StateError("No elements"))); | 276 result._setError(new AsyncError(new StateError("No elements"))); |
| 277 } else { | 277 } else { |
| 278 result._setValue(value); | 278 result._setValue(value); |
| 279 } | 279 } |
| 280 }, | 280 }, |
| 281 unsubscribeOnError: true | 281 cancelOnError: true |
| 282 ); | 282 ); |
| 283 return result; | 283 return result; |
| 284 } | 284 } |
| 285 | 285 |
| 286 /** Reduces a sequence of values by repeatedly applying [combine]. */ | 286 /** Reduces a sequence of values by repeatedly applying [combine]. */ |
| 287 Future fold(var initialValue, combine(var previous, T element)) { | 287 Future fold(var initialValue, combine(var previous, T element)) { |
| 288 _FutureImpl result = new _FutureImpl(); | 288 _FutureImpl result = new _FutureImpl(); |
| 289 var value = initialValue; | 289 var value = initialValue; |
| 290 StreamSubscription subscription; | 290 StreamSubscription subscription; |
| 291 subscription = this.listen( | 291 subscription = this.listen( |
| 292 // TODO(ahe): Restore type when feature is implemented in dart2js | 292 // TODO(ahe): Restore type when feature is implemented in dart2js |
| 293 // checked mode. http://dartbug.com/7733 | 293 // checked mode. http://dartbug.com/7733 |
| 294 (/*T*/ element) { | 294 (/*T*/ element) { |
| 295 _runUserCode( | 295 _runUserCode( |
| 296 () => combine(value, element), | 296 () => combine(value, element), |
| 297 (newValue) { value = newValue; }, | 297 (newValue) { value = newValue; }, |
| 298 _cancelAndError(subscription, result) | 298 _cancelAndError(subscription, result) |
| 299 ); | 299 ); |
| 300 }, | 300 }, |
| 301 onError: (AsyncError e) { | 301 onError: (AsyncError e) { |
| 302 result._setError(e); | 302 result._setError(e); |
| 303 }, | 303 }, |
| 304 onDone: () { | 304 onDone: () { |
| 305 result._setValue(value); | 305 result._setValue(value); |
| 306 }, | 306 }, |
| 307 unsubscribeOnError: true); | 307 cancelOnError: true); |
| 308 return result; | 308 return result; |
| 309 } | 309 } |
| 310 | 310 |
| 311 /** | 311 /** |
| 312 * Checks whether [match] occurs in the elements provided by this stream. | 312 * Checks whether [match] occurs in the elements provided by this stream. |
| 313 * | 313 * |
| 314 * Completes the [Future] when the answer is known. | 314 * Completes the [Future] when the answer is known. |
| 315 * If this stream reports an error, the [Future] will report that error. | 315 * If this stream reports an error, the [Future] will report that error. |
| 316 */ | 316 */ |
| 317 Future<bool> contains(T match) { | 317 Future<bool> contains(T match) { |
| (...skipping 11 matching lines...) Expand all Loading... | |
| 329 future._setValue(true); | 329 future._setValue(true); |
| 330 } | 330 } |
| 331 }, | 331 }, |
| 332 _cancelAndError(subscription, future) | 332 _cancelAndError(subscription, future) |
| 333 ); | 333 ); |
| 334 }, | 334 }, |
| 335 onError: future._setError, | 335 onError: future._setError, |
| 336 onDone: () { | 336 onDone: () { |
| 337 future._setValue(false); | 337 future._setValue(false); |
| 338 }, | 338 }, |
| 339 unsubscribeOnError: true); | 339 cancelOnError: true); |
| 340 return future; | 340 return future; |
| 341 } | 341 } |
| 342 | 342 |
| 343 /** | 343 /** |
| 344 * Checks whether [test] accepts all elements provided by this stream. | 344 * Checks whether [test] accepts all elements provided by this stream. |
| 345 * | 345 * |
| 346 * Completes the [Future] when the answer is known. | 346 * Completes the [Future] when the answer is known. |
| 347 * If this stream reports an error, the [Future] will report that error. | 347 * If this stream reports an error, the [Future] will report that error. |
| 348 */ | 348 */ |
| 349 Future<bool> every(bool test(T element)) { | 349 Future<bool> every(bool test(T element)) { |
| (...skipping 11 matching lines...) Expand all Loading... | |
| 361 future._setValue(false); | 361 future._setValue(false); |
| 362 } | 362 } |
| 363 }, | 363 }, |
| 364 _cancelAndError(subscription, future) | 364 _cancelAndError(subscription, future) |
| 365 ); | 365 ); |
| 366 }, | 366 }, |
| 367 onError: future._setError, | 367 onError: future._setError, |
| 368 onDone: () { | 368 onDone: () { |
| 369 future._setValue(true); | 369 future._setValue(true); |
| 370 }, | 370 }, |
| 371 unsubscribeOnError: true); | 371 cancelOnError: true); |
| 372 return future; | 372 return future; |
| 373 } | 373 } |
| 374 | 374 |
| 375 /** | 375 /** |
| 376 * Checks whether [test] accepts any element provided by this stream. | 376 * Checks whether [test] accepts any element provided by this stream. |
| 377 * | 377 * |
| 378 * Completes the [Future] when the answer is known. | 378 * Completes the [Future] when the answer is known. |
| 379 * If this stream reports an error, the [Future] will report that error. | 379 * If this stream reports an error, the [Future] will report that error. |
| 380 */ | 380 */ |
| 381 Future<bool> any(bool test(T element)) { | 381 Future<bool> any(bool test(T element)) { |
| (...skipping 11 matching lines...) Expand all Loading... | |
| 393 future._setValue(true); | 393 future._setValue(true); |
| 394 } | 394 } |
| 395 }, | 395 }, |
| 396 _cancelAndError(subscription, future) | 396 _cancelAndError(subscription, future) |
| 397 ); | 397 ); |
| 398 }, | 398 }, |
| 399 onError: future._setError, | 399 onError: future._setError, |
| 400 onDone: () { | 400 onDone: () { |
| 401 future._setValue(false); | 401 future._setValue(false); |
| 402 }, | 402 }, |
| 403 unsubscribeOnError: true); | 403 cancelOnError: true); |
| 404 return future; | 404 return future; |
| 405 } | 405 } |
| 406 | 406 |
| 407 | 407 |
| 408 /** Counts the elements in the stream. */ | 408 /** Counts the elements in the stream. */ |
| 409 Future<int> get length { | 409 Future<int> get length { |
| 410 _FutureImpl<int> future = new _FutureImpl<int>(); | 410 _FutureImpl<int> future = new _FutureImpl<int>(); |
| 411 int count = 0; | 411 int count = 0; |
| 412 this.listen( | 412 this.listen( |
| 413 (_) { count++; }, | 413 (_) { count++; }, |
| 414 onError: future._setError, | 414 onError: future._setError, |
| 415 onDone: () { | 415 onDone: () { |
| 416 future._setValue(count); | 416 future._setValue(count); |
| 417 }, | 417 }, |
| 418 unsubscribeOnError: true); | 418 cancelOnError: true); |
| 419 return future; | 419 return future; |
| 420 } | 420 } |
| 421 | 421 |
| 422 /** Reports whether this stream contains any elements. */ | 422 /** Reports whether this stream contains any elements. */ |
| 423 Future<bool> get isEmpty { | 423 Future<bool> get isEmpty { |
| 424 _FutureImpl<bool> future = new _FutureImpl<bool>(); | 424 _FutureImpl<bool> future = new _FutureImpl<bool>(); |
| 425 StreamSubscription subscription; | 425 StreamSubscription subscription; |
| 426 subscription = this.listen( | 426 subscription = this.listen( |
| 427 (_) { | 427 (_) { |
| 428 subscription.cancel(); | 428 subscription.cancel(); |
| 429 future._setValue(false); | 429 future._setValue(false); |
| 430 }, | 430 }, |
| 431 onError: future._setError, | 431 onError: future._setError, |
| 432 onDone: () { | 432 onDone: () { |
| 433 future._setValue(true); | 433 future._setValue(true); |
| 434 }, | 434 }, |
| 435 unsubscribeOnError: true); | 435 cancelOnError: true); |
| 436 return future; | 436 return future; |
| 437 } | 437 } |
| 438 | 438 |
| 439 /** Collects the data of this stream in a [List]. */ | 439 /** Collects the data of this stream in a [List]. */ |
| 440 Future<List<T>> toList() { | 440 Future<List<T>> toList() { |
| 441 List<T> result = <T>[]; | 441 List<T> result = <T>[]; |
| 442 _FutureImpl<List<T>> future = new _FutureImpl<List<T>>(); | 442 _FutureImpl<List<T>> future = new _FutureImpl<List<T>>(); |
| 443 this.listen( | 443 this.listen( |
| 444 // TODO(ahe): Restore type when feature is implemented in dart2js | 444 // TODO(ahe): Restore type when feature is implemented in dart2js |
| 445 // checked mode. http://dartbug.com/7733 | 445 // checked mode. http://dartbug.com/7733 |
| 446 (/*T*/ data) { | 446 (/*T*/ data) { |
| 447 result.add(data); | 447 result.add(data); |
| 448 }, | 448 }, |
| 449 onError: future._setError, | 449 onError: future._setError, |
| 450 onDone: () { | 450 onDone: () { |
| 451 future._setValue(result); | 451 future._setValue(result); |
| 452 }, | 452 }, |
| 453 unsubscribeOnError: true); | 453 cancelOnError: true); |
| 454 return future; | 454 return future; |
| 455 } | 455 } |
| 456 | 456 |
| 457 /** Collects the data of this stream in a [Set]. */ | 457 /** Collects the data of this stream in a [Set]. */ |
| 458 Future<Set<T>> toSet() { | 458 Future<Set<T>> toSet() { |
| 459 Set<T> result = new Set<T>(); | 459 Set<T> result = new Set<T>(); |
| 460 _FutureImpl<Set<T>> future = new _FutureImpl<Set<T>>(); | 460 _FutureImpl<Set<T>> future = new _FutureImpl<Set<T>>(); |
| 461 this.listen( | 461 this.listen( |
| 462 // TODO(ahe): Restore type when feature is implemented in dart2js | 462 // TODO(ahe): Restore type when feature is implemented in dart2js |
| 463 // checked mode. http://dartbug.com/7733 | 463 // checked mode. http://dartbug.com/7733 |
| 464 (/*T*/ data) { | 464 (/*T*/ data) { |
| 465 result.add(data); | 465 result.add(data); |
| 466 }, | 466 }, |
| 467 onError: future._setError, | 467 onError: future._setError, |
| 468 onDone: () { | 468 onDone: () { |
| 469 future._setValue(result); | 469 future._setValue(result); |
| 470 }, | 470 }, |
| 471 unsubscribeOnError: true); | 471 cancelOnError: true); |
| 472 return future; | 472 return future; |
| 473 } | 473 } |
| 474 | 474 |
| 475 /** | 475 /** |
| 476 * Provides at most the first [n] values of this stream. | 476 * Provides at most the first [n] values of this stream. |
| 477 * | 477 * |
| 478 * Forwards the first [n] data events of this stream, and all error | 478 * Forwards the first [n] data events of this stream, and all error |
| 479 * events, to the returned stream, and ends with a done event. | 479 * events, to the returned stream, and ends with a done event. |
| 480 * | 480 * |
| 481 * If this stream produces fewer than [count] values before it's done, | 481 * If this stream produces fewer than [count] values before it's done, |
| (...skipping 61 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
| 543 // checked mode. http://dartbug.com/7733 | 543 // checked mode. http://dartbug.com/7733 |
| 544 (/*T*/ value) { | 544 (/*T*/ value) { |
| 545 subscription.cancel(); | 545 subscription.cancel(); |
| 546 future._setValue(value); | 546 future._setValue(value); |
| 547 return; | 547 return; |
| 548 }, | 548 }, |
| 549 onError: future._setError, | 549 onError: future._setError, |
| 550 onDone: () { | 550 onDone: () { |
| 551 future._setError(new AsyncError(new StateError("No elements"))); | 551 future._setError(new AsyncError(new StateError("No elements"))); |
| 552 }, | 552 }, |
| 553 unsubscribeOnError: true); | 553 cancelOnError: true); |
| 554 return future; | 554 return future; |
| 555 } | 555 } |
| 556 | 556 |
| 557 /** | 557 /** |
| 558 * Returns the last element. | 558 * Returns the last element. |
| 559 * | 559 * |
| 560 * If [this] is empty throws a [StateError]. | 560 * If [this] is empty throws a [StateError]. |
| 561 */ | 561 */ |
| 562 Future<T> get last { | 562 Future<T> get last { |
| 563 _FutureImpl<T> future = new _FutureImpl<T>(); | 563 _FutureImpl<T> future = new _FutureImpl<T>(); |
| 564 T result = null; | 564 T result = null; |
| 565 bool foundResult = false; | 565 bool foundResult = false; |
| 566 StreamSubscription subscription; | 566 StreamSubscription subscription; |
| 567 subscription = this.listen( | 567 subscription = this.listen( |
| 568 // TODO(ahe): Restore type when feature is implemented in dart2js | 568 // TODO(ahe): Restore type when feature is implemented in dart2js |
| 569 // checked mode. http://dartbug.com/7733 | 569 // checked mode. http://dartbug.com/7733 |
| 570 (/*T*/ value) { | 570 (/*T*/ value) { |
| 571 foundResult = true; | 571 foundResult = true; |
| 572 result = value; | 572 result = value; |
| 573 }, | 573 }, |
| 574 onError: future._setError, | 574 onError: future._setError, |
| 575 onDone: () { | 575 onDone: () { |
| 576 if (foundResult) { | 576 if (foundResult) { |
| 577 future._setValue(result); | 577 future._setValue(result); |
| 578 return; | 578 return; |
| 579 } | 579 } |
| 580 future._setError(new AsyncError(new StateError("No elements"))); | 580 future._setError(new AsyncError(new StateError("No elements"))); |
| 581 }, | 581 }, |
| 582 unsubscribeOnError: true); | 582 cancelOnError: true); |
| 583 return future; | 583 return future; |
| 584 } | 584 } |
| 585 | 585 |
| 586 /** | 586 /** |
| 587 * Returns the single element. | 587 * Returns the single element. |
| 588 * | 588 * |
| 589 * If [this] is empty or has more than one element throws a [StateError]. | 589 * If [this] is empty or has more than one element throws a [StateError]. |
| 590 */ | 590 */ |
| 591 Future<T> get single { | 591 Future<T> get single { |
| 592 _FutureImpl<T> future = new _FutureImpl<T>(); | 592 _FutureImpl<T> future = new _FutureImpl<T>(); |
| (...skipping 15 matching lines...) Expand all Loading... | |
| 608 result = value; | 608 result = value; |
| 609 }, | 609 }, |
| 610 onError: future._setError, | 610 onError: future._setError, |
| 611 onDone: () { | 611 onDone: () { |
| 612 if (foundResult) { | 612 if (foundResult) { |
| 613 future._setValue(result); | 613 future._setValue(result); |
| 614 return; | 614 return; |
| 615 } | 615 } |
| 616 future._setError(new AsyncError(new StateError("No elements"))); | 616 future._setError(new AsyncError(new StateError("No elements"))); |
| 617 }, | 617 }, |
| 618 unsubscribeOnError: true); | 618 cancelOnError: true); |
| 619 return future; | 619 return future; |
| 620 } | 620 } |
| 621 | 621 |
| 622 /** | 622 /** |
| 623 * Finds the first element of this stream matching [test]. | 623 * Finds the first element of this stream matching [test]. |
| 624 * | 624 * |
| 625 * Returns a future that is filled with the first element of this stream | 625 * Returns a future that is filled with the first element of this stream |
| 626 * that [test] returns true for. | 626 * that [test] returns true for. |
| 627 * | 627 * |
| 628 * If no such element is found before this stream is done, and a | 628 * If no such element is found before this stream is done, and a |
| (...skipping 24 matching lines...) Expand all Loading... | |
| 653 }, | 653 }, |
| 654 onError: future._setError, | 654 onError: future._setError, |
| 655 onDone: () { | 655 onDone: () { |
| 656 if (defaultValue != null) { | 656 if (defaultValue != null) { |
| 657 _runUserCode(defaultValue, future._setValue, future._setError); | 657 _runUserCode(defaultValue, future._setValue, future._setError); |
| 658 return; | 658 return; |
| 659 } | 659 } |
| 660 future._setError( | 660 future._setError( |
| 661 new AsyncError(new StateError("firstMatch ended without match"))); | 661 new AsyncError(new StateError("firstMatch ended without match"))); |
| 662 }, | 662 }, |
| 663 unsubscribeOnError: true); | 663 cancelOnError: true); |
| 664 return future; | 664 return future; |
| 665 } | 665 } |
| 666 | 666 |
| 667 /** | 667 /** |
| 668 * Finds the last element in this stream matching [test]. | 668 * Finds the last element in this stream matching [test]. |
| 669 * | 669 * |
| 670 * As [firstWhere], except that the last matching element is found. | 670 * As [firstWhere], except that the last matching element is found. |
| 671 * That means that the result cannot be provided before this stream | 671 * That means that the result cannot be provided before this stream |
| 672 * is done. | 672 * is done. |
| 673 */ | 673 */ |
| (...skipping 23 matching lines...) Expand all Loading... | |
| 697 future._setValue(result); | 697 future._setValue(result); |
| 698 return; | 698 return; |
| 699 } | 699 } |
| 700 if (defaultValue != null) { | 700 if (defaultValue != null) { |
| 701 _runUserCode(defaultValue, future._setValue, future._setError); | 701 _runUserCode(defaultValue, future._setValue, future._setError); |
| 702 return; | 702 return; |
| 703 } | 703 } |
| 704 future._setError( | 704 future._setError( |
| 705 new AsyncError(new StateError("lastMatch ended without match"))); | 705 new AsyncError(new StateError("lastMatch ended without match"))); |
| 706 }, | 706 }, |
| 707 unsubscribeOnError: true); | 707 cancelOnError: true); |
| 708 return future; | 708 return future; |
| 709 } | 709 } |
| 710 | 710 |
| 711 /** | 711 /** |
| 712 * Finds the single element in this stream matching [test]. | 712 * Finds the single element in this stream matching [test]. |
| 713 * | 713 * |
| 714 * Like [lastMatch], except that it is an error if more than one | 714 * Like [lastMatch], except that it is an error if more than one |
| 715 * matching element occurs in the stream. | 715 * matching element occurs in the stream. |
| 716 */ | 716 */ |
| 717 Future<T> singleWhere(bool test(T value)) { | 717 Future<T> singleWhere(bool test(T value)) { |
| (...skipping 24 matching lines...) Expand all Loading... | |
| 742 }, | 742 }, |
| 743 onError: future._setError, | 743 onError: future._setError, |
| 744 onDone: () { | 744 onDone: () { |
| 745 if (foundResult) { | 745 if (foundResult) { |
| 746 future._setValue(result); | 746 future._setValue(result); |
| 747 return; | 747 return; |
| 748 } | 748 } |
| 749 future._setError( | 749 future._setError( |
| 750 new AsyncError(new StateError("single ended without match"))); | 750 new AsyncError(new StateError("single ended without match"))); |
| 751 }, | 751 }, |
| 752 unsubscribeOnError: true); | 752 cancelOnError: true); |
| 753 return future; | 753 return future; |
| 754 } | 754 } |
| 755 | 755 |
| 756 /** | 756 /** |
| 757 * Returns the value of the [index]th data event of this stream. | 757 * Returns the value of the [index]th data event of this stream. |
| 758 * | 758 * |
| 759 * If an error event occurs, the future will end with this error. | 759 * If an error event occurs, the future will end with this error. |
| 760 * | 760 * |
| 761 * If this stream provides fewer than [index] elements before closing, | 761 * If this stream provides fewer than [index] elements before closing, |
| 762 * an error is reported. | 762 * an error is reported. |
| (...skipping 11 matching lines...) Expand all Loading... | |
| 774 future._setValue(value); | 774 future._setValue(value); |
| 775 return; | 775 return; |
| 776 } | 776 } |
| 777 index -= 1; | 777 index -= 1; |
| 778 }, | 778 }, |
| 779 onError: future._setError, | 779 onError: future._setError, |
| 780 onDone: () { | 780 onDone: () { |
| 781 future._setError(new AsyncError( | 781 future._setError(new AsyncError( |
| 782 new StateError("Not enough elements for elementAt"))); | 782 new StateError("Not enough elements for elementAt"))); |
| 783 }, | 783 }, |
| 784 unsubscribeOnError: true); | 784 cancelOnError: true); |
| 785 return future; | 785 return future; |
| 786 } | 786 } |
| 787 } | 787 } |
| 788 | 788 |
| 789 /** | 789 /** |
| 790 * A control object for the subscription on a [Stream]. | 790 * A control object for the subscription on a [Stream]. |
| 791 * | 791 * |
| 792 * When you subscribe on a [Stream] using [Stream.listen], | 792 * When you subscribe on a [Stream] using [Stream.listen], |
| 793 * a [StreamSubscription] object is returned. This object | 793 * a [StreamSubscription] object is returned. This object |
| 794 * is used to later unsubscribe again, or to temporarily pause | 794 * is used to later unsubscribe again, or to temporarily pause |
| (...skipping 36 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
| 831 */ | 831 */ |
| 832 void resume(); | 832 void resume(); |
| 833 | 833 |
| 834 /** | 834 /** |
| 835 * Returns a future that handles the [onDone] and [onError] callbacks. | 835 * Returns a future that handles the [onDone] and [onError] callbacks. |
| 836 * | 836 * |
| 837 * This method *overwrites* the existing [onDone] and [onError] callbacks | 837 * This method *overwrites* the existing [onDone] and [onError] callbacks |
| 838 * with new ones that complete the returned future. | 838 * with new ones that complete the returned future. |
| 839 * | 839 * |
| 840 * In case of an error the subscription will automatically cancel (even | 840 * In case of an error the subscription will automatically cancel (even |
| 841 * when it was listening with `unsubscribeOnError` set to `false`). | 841 * when it was listening with `cancelOnError` set to `false`). |
| 842 * | 842 * |
| 843 * In case of a `done` event the future completes with the given | 843 * In case of a `done` event the future completes with the given |
| 844 * [futureValue]. | 844 * [futureValue]. |
| 845 */ | 845 */ |
| 846 Future asFuture([var futureValue]); | 846 Future asFuture([var futureValue]); |
| 847 } | 847 } |
| 848 | 848 |
| 849 | 849 |
| 850 /** | 850 /** |
| 851 * An interface that abstracts creation or handling of [Stream] events. | 851 * An interface that abstracts creation or handling of [Stream] events. |
| (...skipping 14 matching lines...) Expand all Loading... | |
| 866 | 866 |
| 867 StreamView(this._stream); | 867 StreamView(this._stream); |
| 868 | 868 |
| 869 bool get isBroadcast => _stream.isBroadcast; | 869 bool get isBroadcast => _stream.isBroadcast; |
| 870 | 870 |
| 871 Stream<T> asBroadcastStream() => _stream.asBroadcastStream(); | 871 Stream<T> asBroadcastStream() => _stream.asBroadcastStream(); |
| 872 | 872 |
| 873 StreamSubscription<T> listen(void onData(T value), | 873 StreamSubscription<T> listen(void onData(T value), |
| 874 { void onError(AsyncError error), | 874 { void onError(AsyncError error), |
| 875 void onDone(), | 875 void onDone(), |
| 876 bool unsubscribeOnError }) { | 876 bool cancelOnError }) { |
| 877 return _stream.listen(onData, onError: onError, onDone: onDone, | 877 return _stream.listen(onData, onError: onError, onDone: onDone, |
| 878 unsubscribeOnError: unsubscribeOnError); | 878 cancelOnError: cancelOnError); |
| 879 } | 879 } |
| 880 } | 880 } |
| 881 | 881 |
| 882 /** | 882 /** |
| 883 * [EventSink] wrapper that only exposes the [EventSink] interface. | 883 * [EventSink] wrapper that only exposes the [EventSink] interface. |
| 884 */ | 884 */ |
| 885 class _EventSinkView<T> extends EventSink<T> { | 885 class _EventSinkView<T> extends EventSink<T> { |
| 886 final EventSink<T> _sink; | 886 final EventSink<T> _sink; |
| 887 | 887 |
| 888 _EventSinkView(this._sink); | 888 _EventSinkView(this._sink); |
| (...skipping 189 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
| 1078 class EventTransformStream<S, T> extends Stream<T> { | 1078 class EventTransformStream<S, T> extends Stream<T> { |
| 1079 final Stream<S> _source; | 1079 final Stream<S> _source; |
| 1080 final StreamEventTransformer _transformer; | 1080 final StreamEventTransformer _transformer; |
| 1081 EventTransformStream(Stream<S> source, | 1081 EventTransformStream(Stream<S> source, |
| 1082 StreamEventTransformer<S, T> transformer) | 1082 StreamEventTransformer<S, T> transformer) |
| 1083 : _source = source, _transformer = transformer; | 1083 : _source = source, _transformer = transformer; |
| 1084 | 1084 |
| 1085 StreamSubscription<T> listen(void onData(T data), | 1085 StreamSubscription<T> listen(void onData(T data), |
| 1086 { void onError(AsyncError error), | 1086 { void onError(AsyncError error), |
| 1087 void onDone(), | 1087 void onDone(), |
| 1088 bool unsubscribeOnError }) { | 1088 bool cancelOnError }) { |
| 1089 unsubscribeOnError = identical(true, unsubscribeOnError); | 1089 cancelOnError = identical(true, cancelOnError); |
| 1090 return new _EventTransformStreamSubscription(_source, _transformer, | 1090 return new _EventTransformStreamSubscription(_source, _transformer, |
| 1091 onData, onError, onDone, | 1091 onData, onError, onDone, |
| 1092 unsubscribeOnError); | 1092 cancelOnError); |
| 1093 } | 1093 } |
| 1094 } | 1094 } |
| 1095 | 1095 |
| 1096 class _EventTransformStreamSubscription<S, T> | 1096 class _EventTransformStreamSubscription<S, T> |
| 1097 extends _BaseStreamSubscription<T> | 1097 extends _BaseStreamSubscription<T> |
| 1098 implements _EventOutputSink<T> { | 1098 implements _EventOutputSink<T> { |
| 1099 /** The transformer used to transform events. */ | 1099 /** The transformer used to transform events. */ |
| 1100 final StreamEventTransformer<S, T> _transformer; | 1100 final StreamEventTransformer<S, T> _transformer; |
| 1101 /** Whether to unsubscribe when emitting an error. */ | 1101 /** Whether to unsubscribe when emitting an error. */ |
|
Lasse Reichstein Nielsen
2013/04/15 16:11:03
unsubscribe -> cancel :)
floitsch
2013/04/15 16:13:52
Please file a bug. I will do it later. ;P
| |
| 1102 final bool _unsubscribeOnError; | 1102 final bool _cancelOnError; |
| 1103 /** Whether this stream has sent a done event. */ | 1103 /** Whether this stream has sent a done event. */ |
| 1104 bool _isClosed = false; | 1104 bool _isClosed = false; |
| 1105 /** Source of incoming events. */ | 1105 /** Source of incoming events. */ |
| 1106 StreamSubscription<S> _subscription; | 1106 StreamSubscription<S> _subscription; |
| 1107 /** Cached EventSink wrapper for this class. */ | 1107 /** Cached EventSink wrapper for this class. */ |
| 1108 EventSink<T> _sink; | 1108 EventSink<T> _sink; |
| 1109 | 1109 |
| 1110 _EventTransformStreamSubscription(Stream<S> source, | 1110 _EventTransformStreamSubscription(Stream<S> source, |
| 1111 this._transformer, | 1111 this._transformer, |
| 1112 void onData(T data), | 1112 void onData(T data), |
| 1113 void onError(AsyncError error), | 1113 void onError(AsyncError error), |
| 1114 void onDone(), | 1114 void onDone(), |
| 1115 this._unsubscribeOnError) | 1115 this._cancelOnError) |
| 1116 : super(onData, onError, onDone) { | 1116 : super(onData, onError, onDone) { |
| 1117 _sink = new _EventOutputSinkWrapper<T>(this); | 1117 _sink = new _EventOutputSinkWrapper<T>(this); |
| 1118 _subscription = source.listen(_handleData, | 1118 _subscription = source.listen(_handleData, |
| 1119 onError: _handleError, | 1119 onError: _handleError, |
| 1120 onDone: _handleDone); | 1120 onDone: _handleDone); |
| 1121 } | 1121 } |
| 1122 | 1122 |
| 1123 /** Whether this subscription is still subscribed to its source. */ | 1123 /** Whether this subscription is still subscribed to its source. */ |
| 1124 bool get _isSubscribed => _subscription != null; | 1124 bool get _isSubscribed => _subscription != null; |
| 1125 | 1125 |
| (...skipping 41 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
| 1167 | 1167 |
| 1168 // EventOutputSink interface. | 1168 // EventOutputSink interface. |
| 1169 void _sendData(T data) { | 1169 void _sendData(T data) { |
| 1170 if (_isClosed) return; | 1170 if (_isClosed) return; |
| 1171 _onData(data); | 1171 _onData(data); |
| 1172 } | 1172 } |
| 1173 | 1173 |
| 1174 void _sendError(AsyncError error) { | 1174 void _sendError(AsyncError error) { |
| 1175 if (_isClosed) return; | 1175 if (_isClosed) return; |
| 1176 _onError(error); | 1176 _onError(error); |
| 1177 if (_unsubscribeOnError) { | 1177 if (_cancelOnError) { |
| 1178 cancel(); | 1178 cancel(); |
| 1179 } | 1179 } |
| 1180 } | 1180 } |
| 1181 | 1181 |
| 1182 void _sendDone() { | 1182 void _sendDone() { |
| 1183 if (_isClosed) throw new StateError("Already closed."); | 1183 if (_isClosed) throw new StateError("Already closed."); |
| 1184 _isClosed = true; | 1184 _isClosed = true; |
| 1185 if (_isSubscribed) { | 1185 if (_isSubscribed) { |
| 1186 _subscription.cancel(); | 1186 _subscription.cancel(); |
| 1187 _subscription = null; | 1187 _subscription = null; |
| 1188 } | 1188 } |
| 1189 _onDone(); | 1189 _onDone(); |
| 1190 } | 1190 } |
| 1191 } | 1191 } |
| 1192 | 1192 |
| 1193 class _EventOutputSinkWrapper<T> extends EventSink<T> { | 1193 class _EventOutputSinkWrapper<T> extends EventSink<T> { |
| 1194 _EventOutputSink _sink; | 1194 _EventOutputSink _sink; |
| 1195 _EventOutputSinkWrapper(this._sink); | 1195 _EventOutputSinkWrapper(this._sink); |
| 1196 | 1196 |
| 1197 void add(T data) { _sink._sendData(data); } | 1197 void add(T data) { _sink._sendData(data); } |
| 1198 void addError(AsyncError error) { _sink._sendError(error); } | 1198 void addError(AsyncError error) { _sink._sendError(error); } |
| 1199 void close() { _sink._sendDone(); } | 1199 void close() { _sink._sendDone(); } |
| 1200 } | 1200 } |
| OLD | NEW |