OLD | NEW |
1 // Copyright (c) 2013, the Dart project authors. Please see the AUTHORS file | 1 // Copyright (c) 2013, the Dart project authors. Please see the AUTHORS file |
2 // for details. All rights reserved. Use of this source code is governed by a | 2 // for details. All rights reserved. Use of this source code is governed by a |
3 // BSD-style license that can be found in the LICENSE file. | 3 // BSD-style license that can be found in the LICENSE file. |
4 | 4 |
5 part of dart.async; | 5 part of dart.async; |
6 | 6 |
7 // ------------------------------------------------------------------- | 7 // ------------------------------------------------------------------- |
8 // Core Stream types | 8 // Core Stream types |
9 // ------------------------------------------------------------------- | 9 // ------------------------------------------------------------------- |
10 | 10 |
(...skipping 278 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
289 } | 289 } |
290 | 290 |
291 /** | 291 /** |
292 * Creates a new stream that converts each element of this stream | 292 * Creates a new stream that converts each element of this stream |
293 * to a new value using the [convert] function. | 293 * to a new value using the [convert] function. |
294 * | 294 * |
295 * The returned stream is a broadcast stream if this stream is. | 295 * The returned stream is a broadcast stream if this stream is. |
296 * If a broadcast stream is listened to more than once, each subscription | 296 * If a broadcast stream is listened to more than once, each subscription |
297 * will individually execute `map` for each event. | 297 * will individually execute `map` for each event. |
298 */ | 298 */ |
299 Stream map(convert(T event)) { | 299 Stream/*<S>*/ map/*<S>*/(/*=S*/ convert(T event)) { |
300 return new _MapStream<T, dynamic>(this, convert); | 300 return new _MapStream<T, dynamic/*=S*/>(this, convert); |
301 } | 301 } |
302 | 302 |
303 /** | 303 /** |
304 * Creates a new stream with each data event of this stream asynchronously | 304 * Creates a new stream with each data event of this stream asynchronously |
305 * mapped to a new event. | 305 * mapped to a new event. |
306 * | 306 * |
307 * This acts like [map], except that [convert] may return a [Future], | 307 * This acts like [map], except that [convert] may return a [Future], |
308 * and in that case, the stream waits for that future to complete before | 308 * and in that case, the stream waits for that future to complete before |
309 * continuing with its result. | 309 * continuing with its result. |
310 * | 310 * |
(...skipping 140 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
451 * into zero or more events. | 451 * into zero or more events. |
452 * | 452 * |
453 * Each incoming event is converted to an [Iterable] of new events, | 453 * Each incoming event is converted to an [Iterable] of new events, |
454 * and each of these new events are then sent by the returned stream | 454 * and each of these new events are then sent by the returned stream |
455 * in order. | 455 * in order. |
456 * | 456 * |
457 * The returned stream is a broadcast stream if this stream is. | 457 * The returned stream is a broadcast stream if this stream is. |
458 * If a broadcast stream is listened to more than once, each subscription | 458 * If a broadcast stream is listened to more than once, each subscription |
459 * will individually call `convert` and expand the events. | 459 * will individually call `convert` and expand the events. |
460 */ | 460 */ |
461 Stream expand(Iterable convert(T value)) { | 461 Stream/*<S>*/ expand/*<S>*/(Iterable/*<S>*/ convert(T value)) { |
462 return new _ExpandStream<T, dynamic>(this, convert); | 462 return new _ExpandStream<T, dynamic/*=S*/>(this, convert); |
463 } | 463 } |
464 | 464 |
465 /** | 465 /** |
466 * Binds this stream as the input of the provided [StreamConsumer]. | 466 * Binds this stream as the input of the provided [StreamConsumer]. |
467 * | 467 * |
468 * The `streamConsumer` is closed when the stream has been added to it. | 468 * The `streamConsumer` is closed when the stream has been added to it. |
469 * | 469 * |
470 * Returns a future which completes when the stream has been consumed | 470 * Returns a future which completes when the stream has been consumed |
471 * and the consumer has been closed. | 471 * and the consumer has been closed. |
472 */ | 472 */ |
(...skipping 43 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
516 } else { | 516 } else { |
517 result._complete(value); | 517 result._complete(value); |
518 } | 518 } |
519 }, | 519 }, |
520 cancelOnError: true | 520 cancelOnError: true |
521 ); | 521 ); |
522 return result; | 522 return result; |
523 } | 523 } |
524 | 524 |
525 /** Reduces a sequence of values by repeatedly applying [combine]. */ | 525 /** Reduces a sequence of values by repeatedly applying [combine]. */ |
526 Future fold(var initialValue, combine(var previous, T element)) { | 526 Future/*<S>*/ fold/*<S>*/(var/*=S*/ initialValue, |
527 _Future result = new _Future(); | 527 /*=S*/ combine(var/*=S*/ previous, T element)) { |
| 528 _Future/*<S>*/ result = new _Future(); |
528 var value = initialValue; | 529 var value = initialValue; |
529 StreamSubscription subscription; | 530 StreamSubscription subscription; |
530 subscription = this.listen( | 531 subscription = this.listen( |
531 (T element) { | 532 (T element) { |
532 _runUserCode( | 533 _runUserCode( |
533 () => combine(value, element), | 534 () => combine(value, element), |
534 (newValue) { value = newValue; }, | 535 (newValue) { value = newValue; }, |
535 _cancelAndErrorClosure(subscription, result) | 536 _cancelAndErrorClosure(subscription, result) |
536 ); | 537 ); |
537 }, | 538 }, |
(...skipping 1023 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
1561 * }, | 1562 * }, |
1562 * onPause: subscription.pause, | 1563 * onPause: subscription.pause, |
1563 * onResume: subscription.resume, | 1564 * onResume: subscription.resume, |
1564 * onCancel: subscription.cancel, | 1565 * onCancel: subscription.cancel, |
1565 * sync: true); | 1566 * sync: true); |
1566 * return controller.stream.listen(null); | 1567 * return controller.stream.listen(null); |
1567 * }); | 1568 * }); |
1568 */ | 1569 */ |
1569 const factory StreamTransformer( | 1570 const factory StreamTransformer( |
1570 StreamSubscription<T> transformer(Stream<S> stream, bool cancelOnError)) | 1571 StreamSubscription<T> transformer(Stream<S> stream, bool cancelOnError)) |
1571 = _StreamSubscriptionTransformer; | 1572 = _StreamSubscriptionTransformer<S, T>; |
1572 | 1573 |
1573 /** | 1574 /** |
1574 * Creates a [StreamTransformer] that delegates events to the given functions. | 1575 * Creates a [StreamTransformer] that delegates events to the given functions. |
1575 * | 1576 * |
1576 * Example use of a duplicating transformer: | 1577 * Example use of a duplicating transformer: |
1577 * | 1578 * |
1578 * stringStream.transform(new StreamTransformer<String, String>.fromHandle
rs( | 1579 * stringStream.transform(new StreamTransformer<String, String>.fromHandle
rs( |
1579 * handleData: (String value, EventSink<String> sink) { | 1580 * handleData: (String value, EventSink<String> sink) { |
1580 * sink.add(value); | 1581 * sink.add(value); |
1581 * sink.add(value); // Duplicate the incoming events. | 1582 * sink.add(value); // Duplicate the incoming events. |
1582 * })); | 1583 * })); |
1583 */ | 1584 */ |
1584 factory StreamTransformer.fromHandlers({ | 1585 factory StreamTransformer.fromHandlers({ |
1585 void handleData(S data, EventSink<T> sink), | 1586 void handleData(S data, EventSink<T> sink), |
1586 void handleError(Object error, StackTrace stackTrace, EventSink<T> sink), | 1587 void handleError(Object error, StackTrace stackTrace, EventSink<T> sink), |
1587 void handleDone(EventSink<T> sink)}) | 1588 void handleDone(EventSink<T> sink)}) |
1588 = _StreamHandlerTransformer; | 1589 = _StreamHandlerTransformer<S, T>; |
1589 | 1590 |
1590 /** | 1591 /** |
1591 * Transform the incoming [stream]'s events. | 1592 * Transform the incoming [stream]'s events. |
1592 * | 1593 * |
1593 * Creates a new stream. | 1594 * Creates a new stream. |
1594 * When this stream is listened to, it will start listening on [stream], | 1595 * When this stream is listened to, it will start listening on [stream], |
1595 * and generate events on the new stream based on the events from [stream]. | 1596 * and generate events on the new stream based on the events from [stream]. |
1596 * | 1597 * |
1597 * Subscriptions on the returned stream should propagate pause state | 1598 * Subscriptions on the returned stream should propagate pause state |
1598 * to the subscription on [stream]. | 1599 * to the subscription on [stream]. |
(...skipping 61 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
1660 class _ControllerEventSinkWrapper<T> implements EventSink<T> { | 1661 class _ControllerEventSinkWrapper<T> implements EventSink<T> { |
1661 EventSink _sink; | 1662 EventSink _sink; |
1662 _ControllerEventSinkWrapper(this._sink); | 1663 _ControllerEventSinkWrapper(this._sink); |
1663 | 1664 |
1664 void add(T data) { _sink.add(data); } | 1665 void add(T data) { _sink.add(data); } |
1665 void addError(error, [StackTrace stackTrace]) { | 1666 void addError(error, [StackTrace stackTrace]) { |
1666 _sink.addError(error, stackTrace); | 1667 _sink.addError(error, stackTrace); |
1667 } | 1668 } |
1668 void close() { _sink.close(); } | 1669 void close() { _sink.close(); } |
1669 } | 1670 } |
OLD | NEW |