| OLD | NEW |
| 1 // Copyright (c) 2013, the Dart project authors. Please see the AUTHORS file | 1 // Copyright (c) 2013, the Dart project authors. Please see the AUTHORS file |
| 2 // for details. All rights reserved. Use of this source code is governed by a | 2 // for details. All rights reserved. Use of this source code is governed by a |
| 3 // BSD-style license that can be found in the LICENSE file. | 3 // BSD-style license that can be found in the LICENSE file. |
| 4 | 4 |
| 5 part of dart.async; | 5 part of dart.async; |
| 6 | 6 |
| 7 // ------------------------------------------------------------------- | 7 // ------------------------------------------------------------------- |
| 8 // Core Stream types | 8 // Core Stream types |
| 9 // ------------------------------------------------------------------- | 9 // ------------------------------------------------------------------- |
| 10 | 10 |
| (...skipping 265 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 276 * stringStream.transform(new DuplicationTransformer()); | 276 * stringStream.transform(new DuplicationTransformer()); |
| 277 * | 277 * |
| 278 * The resulting stream is a broadcast stream if [source] is. | 278 * The resulting stream is a broadcast stream if [source] is. |
| 279 */ | 279 */ |
| 280 factory Stream.eventTransformed( | 280 factory Stream.eventTransformed( |
| 281 Stream source, EventSink mapSink(EventSink<T> sink)) { | 281 Stream source, EventSink mapSink(EventSink<T> sink)) { |
| 282 return new _BoundSinkStream(source, mapSink); | 282 return new _BoundSinkStream(source, mapSink); |
| 283 } | 283 } |
| 284 | 284 |
| 285 /** | 285 /** |
| 286 * Reports whether this stream is a broadcast stream. | 286 * Whether this stream is a broadcast stream. |
| 287 */ | 287 */ |
| 288 bool get isBroadcast => false; | 288 bool get isBroadcast => false; |
| 289 | 289 |
| 290 /** | 290 /** |
| 291 * Returns a multi-subscription stream that produces the same events as this. | 291 * Returns a multi-subscription stream that produces the same events as this. |
| 292 * | 292 * |
| 293 * The returned stream will subscribe to this stream when its first | 293 * The returned stream will subscribe to this stream when its first |
| 294 * subscriber is added, and will stay subscribed until this stream ends, | 294 * subscriber is added, and will stay subscribed until this stream ends, |
| 295 * or a callback cancels the subscription. | 295 * or a callback cancels the subscription. |
| 296 * | 296 * |
| (...skipping 92 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 389 /** | 389 /** |
| 390 * Creates a new stream with each data event of this stream asynchronously | 390 * Creates a new stream with each data event of this stream asynchronously |
| 391 * mapped to a new event. | 391 * mapped to a new event. |
| 392 * | 392 * |
| 393 * This acts like [map], except that [convert] may return a [Future], | 393 * This acts like [map], except that [convert] may return a [Future], |
| 394 * and in that case, the stream waits for that future to complete before | 394 * and in that case, the stream waits for that future to complete before |
| 395 * continuing with its result. | 395 * continuing with its result. |
| 396 * | 396 * |
| 397 * The returned stream is a broadcast stream if this stream is. | 397 * The returned stream is a broadcast stream if this stream is. |
| 398 */ | 398 */ |
| 399 Stream<E> asyncMap<E>(convert(T event)) { | 399 Stream<E> asyncMap<E>(FutureOr<E> convert(T event)) { |
| 400 StreamController<E> controller; | 400 StreamController<E> controller; |
| 401 StreamSubscription<T> subscription; | 401 StreamSubscription<T> subscription; |
| 402 | 402 |
| 403 void onListen() { | 403 void onListen() { |
| 404 final add = controller.add; | 404 final add = controller.add; |
| 405 assert(controller is _StreamController || | 405 assert(controller is _StreamController || |
| 406 controller is _BroadcastStreamController); | 406 controller is _BroadcastStreamController); |
| 407 final _EventSink<E> eventSink = controller as Object/*=_EventSink<E>*/; | 407 final _EventSink<E> eventSink = controller as Object/*=_EventSink<E>*/; |
| 408 final addError = eventSink._addError; | 408 final addError = eventSink._addError; |
| 409 subscription = this.listen((T event) { | 409 subscription = this.listen((T event) { |
| 410 dynamic newValue; | 410 FutureOr<E> newValue; |
| 411 try { | 411 try { |
| 412 newValue = convert(event); | 412 newValue = convert(event); |
| 413 } catch (e, s) { | 413 } catch (e, s) { |
| 414 controller.addError(e, s); | 414 controller.addError(e, s); |
| 415 return; | 415 return; |
| 416 } | 416 } |
| 417 if (newValue is Future) { | 417 if (newValue is Future<E>) { |
| 418 subscription.pause(); | 418 subscription.pause(); |
| 419 newValue | 419 newValue |
| 420 .then(add, onError: addError) | 420 .then(add, onError: addError) |
| 421 .whenComplete(subscription.resume); | 421 .whenComplete(subscription.resume); |
| 422 } else { | 422 } else { |
| 423 controller.add(newValue as Object/*=E*/); | 423 controller.add(newValue as Object/*=E*/); |
| 424 } | 424 } |
| 425 }, onError: addError, onDone: controller.close); | 425 }, onError: addError, onDone: controller.close); |
| 426 } | 426 } |
| 427 | 427 |
| (...skipping 275 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 703 * | 703 * |
| 704 * Completes the returned [Future] when all events of the stream | 704 * Completes the returned [Future] when all events of the stream |
| 705 * have been processed. Completes the future with an error if the | 705 * have been processed. Completes the future with an error if the |
| 706 * stream has an error event, or if [action] throws. | 706 * stream has an error event, or if [action] throws. |
| 707 */ | 707 */ |
| 708 Future forEach(void action(T element)) { | 708 Future forEach(void action(T element)) { |
| 709 _Future future = new _Future(); | 709 _Future future = new _Future(); |
| 710 StreamSubscription subscription; | 710 StreamSubscription subscription; |
| 711 subscription = this.listen( | 711 subscription = this.listen( |
| 712 (T element) { | 712 (T element) { |
| 713 _runUserCode(() => action(element), (_) {}, | 713 // TODO(floitsch): the type should be 'void' and inferred. |
| 714 _runUserCode<dynamic>(() => action(element), (_) {}, |
| 714 _cancelAndErrorClosure(subscription, future)); | 715 _cancelAndErrorClosure(subscription, future)); |
| 715 }, | 716 }, |
| 716 onError: future._completeError, | 717 onError: future._completeError, |
| 717 onDone: () { | 718 onDone: () { |
| 718 future._complete(null); | 719 future._complete(null); |
| 719 }, | 720 }, |
| 720 cancelOnError: true); | 721 cancelOnError: true); |
| 721 return future; | 722 return future; |
| 722 } | 723 } |
| 723 | 724 |
| (...skipping 438 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 1162 _completeWithErrorCallback(future, e, s); | 1163 _completeWithErrorCallback(future, e, s); |
| 1163 } | 1164 } |
| 1164 }, | 1165 }, |
| 1165 cancelOnError: true); | 1166 cancelOnError: true); |
| 1166 return future; | 1167 return future; |
| 1167 } | 1168 } |
| 1168 | 1169 |
| 1169 /** | 1170 /** |
| 1170 * Finds the single element in this stream matching [test]. | 1171 * Finds the single element in this stream matching [test]. |
| 1171 * | 1172 * |
| 1172 * Like [lastMatch], except that it is an error if more than one | 1173 * Like [lastWhere], except that it is an error if more than one |
| 1173 * matching element occurs in the stream. | 1174 * matching element occurs in the stream. |
| 1174 */ | 1175 */ |
| 1175 Future<T> singleWhere(bool test(T element)) { | 1176 Future<T> singleWhere(bool test(T element)) { |
| 1176 _Future<T> future = new _Future<T>(); | 1177 _Future<T> future = new _Future<T>(); |
| 1177 T result = null; | 1178 T result = null; |
| 1178 bool foundResult = false; | 1179 bool foundResult = false; |
| 1179 StreamSubscription subscription; | 1180 StreamSubscription subscription; |
| 1180 subscription = this.listen( | 1181 subscription = this.listen( |
| 1181 (T value) { | 1182 (T value) { |
| 1182 _runUserCode(() => true == test(value), (bool isMatch) { | 1183 _runUserCode(() => true == test(value), (bool isMatch) { |
| (...skipping 279 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 1462 } | 1463 } |
| 1463 | 1464 |
| 1464 /** | 1465 /** |
| 1465 * An interface that abstracts creation or handling of [Stream] events. | 1466 * An interface that abstracts creation or handling of [Stream] events. |
| 1466 */ | 1467 */ |
| 1467 abstract class EventSink<T> implements Sink<T> { | 1468 abstract class EventSink<T> implements Sink<T> { |
| 1468 /** Send a data event to a stream. */ | 1469 /** Send a data event to a stream. */ |
| 1469 void add(T event); | 1470 void add(T event); |
| 1470 | 1471 |
| 1471 /** Send an async error to a stream. */ | 1472 /** Send an async error to a stream. */ |
| 1472 void addError(errorEvent, [StackTrace stackTrace]); | 1473 void addError(Object errorEvent, [StackTrace stackTrace]); |
| 1473 | 1474 |
| 1474 /** Close the sink. No further events can be added after closing. */ | 1475 /** Close the sink. No further events can be added after closing. */ |
| 1475 void close(); | 1476 void close(); |
| 1476 } | 1477 } |
| 1477 | 1478 |
| 1478 /** [Stream] wrapper that only exposes the [Stream] interface. */ | 1479 /** [Stream] wrapper that only exposes the [Stream] interface. */ |
| 1479 class StreamView<T> extends Stream<T> { | 1480 class StreamView<T> extends Stream<T> { |
| 1480 final Stream<T> _stream; | 1481 final Stream<T> _stream; |
| 1481 | 1482 |
| 1482 const StreamView(Stream<T> stream) | 1483 const StreamView(Stream<T> stream) |
| (...skipping 306 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 1789 } | 1790 } |
| 1790 | 1791 |
| 1791 void addError(error, [StackTrace stackTrace]) { | 1792 void addError(error, [StackTrace stackTrace]) { |
| 1792 _sink.addError(error, stackTrace); | 1793 _sink.addError(error, stackTrace); |
| 1793 } | 1794 } |
| 1794 | 1795 |
| 1795 void close() { | 1796 void close() { |
| 1796 _sink.close(); | 1797 _sink.close(); |
| 1797 } | 1798 } |
| 1798 } | 1799 } |
| OLD | NEW |