OLD | NEW |
1 // Copyright (c) 2013, the Dart project authors. Please see the AUTHORS file | 1 // Copyright (c) 2013, the Dart project authors. Please see the AUTHORS file |
2 // for details. All rights reserved. Use of this source code is governed by a | 2 // for details. All rights reserved. Use of this source code is governed by a |
3 // BSD-style license that can be found in the LICENSE file. | 3 // BSD-style license that can be found in the LICENSE file. |
4 | 4 |
5 part of dart.async; | 5 part of dart.async; |
6 | 6 |
7 // ------------------------------------------------------------------- | 7 // ------------------------------------------------------------------- |
8 // Core Stream types | 8 // Core Stream types |
9 // ------------------------------------------------------------------- | 9 // ------------------------------------------------------------------- |
10 | 10 |
(...skipping 265 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
276 * stringStream.transform(new DuplicationTransformer()); | 276 * stringStream.transform(new DuplicationTransformer()); |
277 * | 277 * |
278 * The resulting stream is a broadcast stream if [source] is. | 278 * The resulting stream is a broadcast stream if [source] is. |
279 */ | 279 */ |
280 factory Stream.eventTransformed( | 280 factory Stream.eventTransformed( |
281 Stream source, EventSink mapSink(EventSink<T> sink)) { | 281 Stream source, EventSink mapSink(EventSink<T> sink)) { |
282 return new _BoundSinkStream(source, mapSink); | 282 return new _BoundSinkStream(source, mapSink); |
283 } | 283 } |
284 | 284 |
285 /** | 285 /** |
286 * Reports whether this stream is a broadcast stream. | 286 * Whether this stream is a broadcast stream. |
287 */ | 287 */ |
288 bool get isBroadcast => false; | 288 bool get isBroadcast => false; |
289 | 289 |
290 /** | 290 /** |
291 * Returns a multi-subscription stream that produces the same events as this. | 291 * Returns a multi-subscription stream that produces the same events as this. |
292 * | 292 * |
293 * The returned stream will subscribe to this stream when its first | 293 * The returned stream will subscribe to this stream when its first |
294 * subscriber is added, and will stay subscribed until this stream ends, | 294 * subscriber is added, and will stay subscribed until this stream ends, |
295 * or a callback cancels the subscription. | 295 * or a callback cancels the subscription. |
296 * | 296 * |
(...skipping 92 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
389 /** | 389 /** |
390 * Creates a new stream with each data event of this stream asynchronously | 390 * Creates a new stream with each data event of this stream asynchronously |
391 * mapped to a new event. | 391 * mapped to a new event. |
392 * | 392 * |
393 * This acts like [map], except that [convert] may return a [Future], | 393 * This acts like [map], except that [convert] may return a [Future], |
394 * and in that case, the stream waits for that future to complete before | 394 * and in that case, the stream waits for that future to complete before |
395 * continuing with its result. | 395 * continuing with its result. |
396 * | 396 * |
397 * The returned stream is a broadcast stream if this stream is. | 397 * The returned stream is a broadcast stream if this stream is. |
398 */ | 398 */ |
399 Stream<E> asyncMap<E>(convert(T event)) { | 399 Stream<E> asyncMap<E>(FutureOr<E> convert(T event)) { |
400 StreamController<E> controller; | 400 StreamController<E> controller; |
401 StreamSubscription<T> subscription; | 401 StreamSubscription<T> subscription; |
402 | 402 |
403 void onListen() { | 403 void onListen() { |
404 final add = controller.add; | 404 final add = controller.add; |
405 assert(controller is _StreamController || | 405 assert(controller is _StreamController || |
406 controller is _BroadcastStreamController); | 406 controller is _BroadcastStreamController); |
407 final _EventSink<E> eventSink = controller as Object/*=_EventSink<E>*/; | 407 final _EventSink<E> eventSink = controller as Object/*=_EventSink<E>*/; |
408 final addError = eventSink._addError; | 408 final addError = eventSink._addError; |
409 subscription = this.listen((T event) { | 409 subscription = this.listen((T event) { |
410 dynamic newValue; | 410 FutureOr<E> newValue; |
411 try { | 411 try { |
412 newValue = convert(event); | 412 newValue = convert(event); |
413 } catch (e, s) { | 413 } catch (e, s) { |
414 controller.addError(e, s); | 414 controller.addError(e, s); |
415 return; | 415 return; |
416 } | 416 } |
417 if (newValue is Future) { | 417 if (newValue is Future<E>) { |
418 subscription.pause(); | 418 subscription.pause(); |
419 newValue | 419 newValue |
420 .then(add, onError: addError) | 420 .then(add, onError: addError) |
421 .whenComplete(subscription.resume); | 421 .whenComplete(subscription.resume); |
422 } else { | 422 } else { |
423 controller.add(newValue as Object/*=E*/); | 423 controller.add(newValue as Object/*=E*/); |
424 } | 424 } |
425 }, onError: addError, onDone: controller.close); | 425 }, onError: addError, onDone: controller.close); |
426 } | 426 } |
427 | 427 |
(...skipping 275 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
703 * | 703 * |
704 * Completes the returned [Future] when all events of the stream | 704 * Completes the returned [Future] when all events of the stream |
705 * have been processed. Completes the future with an error if the | 705 * have been processed. Completes the future with an error if the |
706 * stream has an error event, or if [action] throws. | 706 * stream has an error event, or if [action] throws. |
707 */ | 707 */ |
708 Future forEach(void action(T element)) { | 708 Future forEach(void action(T element)) { |
709 _Future future = new _Future(); | 709 _Future future = new _Future(); |
710 StreamSubscription subscription; | 710 StreamSubscription subscription; |
711 subscription = this.listen( | 711 subscription = this.listen( |
712 (T element) { | 712 (T element) { |
713 _runUserCode(() => action(element), (_) {}, | 713 // TODO(floitsch): the type should be 'void' and inferred. |
| 714 _runUserCode<dynamic>(() => action(element), (_) {}, |
714 _cancelAndErrorClosure(subscription, future)); | 715 _cancelAndErrorClosure(subscription, future)); |
715 }, | 716 }, |
716 onError: future._completeError, | 717 onError: future._completeError, |
717 onDone: () { | 718 onDone: () { |
718 future._complete(null); | 719 future._complete(null); |
719 }, | 720 }, |
720 cancelOnError: true); | 721 cancelOnError: true); |
721 return future; | 722 return future; |
722 } | 723 } |
723 | 724 |
(...skipping 438 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
1162 _completeWithErrorCallback(future, e, s); | 1163 _completeWithErrorCallback(future, e, s); |
1163 } | 1164 } |
1164 }, | 1165 }, |
1165 cancelOnError: true); | 1166 cancelOnError: true); |
1166 return future; | 1167 return future; |
1167 } | 1168 } |
1168 | 1169 |
1169 /** | 1170 /** |
1170 * Finds the single element in this stream matching [test]. | 1171 * Finds the single element in this stream matching [test]. |
1171 * | 1172 * |
1172 * Like [lastMatch], except that it is an error if more than one | 1173 * Like [lastWhere], except that it is an error if more than one |
1173 * matching element occurs in the stream. | 1174 * matching element occurs in the stream. |
1174 */ | 1175 */ |
1175 Future<T> singleWhere(bool test(T element)) { | 1176 Future<T> singleWhere(bool test(T element)) { |
1176 _Future<T> future = new _Future<T>(); | 1177 _Future<T> future = new _Future<T>(); |
1177 T result = null; | 1178 T result = null; |
1178 bool foundResult = false; | 1179 bool foundResult = false; |
1179 StreamSubscription subscription; | 1180 StreamSubscription subscription; |
1180 subscription = this.listen( | 1181 subscription = this.listen( |
1181 (T value) { | 1182 (T value) { |
1182 _runUserCode(() => true == test(value), (bool isMatch) { | 1183 _runUserCode(() => true == test(value), (bool isMatch) { |
(...skipping 279 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
1462 } | 1463 } |
1463 | 1464 |
1464 /** | 1465 /** |
1465 * An interface that abstracts creation or handling of [Stream] events. | 1466 * An interface that abstracts creation or handling of [Stream] events. |
1466 */ | 1467 */ |
1467 abstract class EventSink<T> implements Sink<T> { | 1468 abstract class EventSink<T> implements Sink<T> { |
1468 /** Send a data event to a stream. */ | 1469 /** Send a data event to a stream. */ |
1469 void add(T event); | 1470 void add(T event); |
1470 | 1471 |
1471 /** Send an async error to a stream. */ | 1472 /** Send an async error to a stream. */ |
1472 void addError(errorEvent, [StackTrace stackTrace]); | 1473 void addError(Object errorEvent, [StackTrace stackTrace]); |
1473 | 1474 |
1474 /** Close the sink. No further events can be added after closing. */ | 1475 /** Close the sink. No further events can be added after closing. */ |
1475 void close(); | 1476 void close(); |
1476 } | 1477 } |
1477 | 1478 |
1478 /** [Stream] wrapper that only exposes the [Stream] interface. */ | 1479 /** [Stream] wrapper that only exposes the [Stream] interface. */ |
1479 class StreamView<T> extends Stream<T> { | 1480 class StreamView<T> extends Stream<T> { |
1480 final Stream<T> _stream; | 1481 final Stream<T> _stream; |
1481 | 1482 |
1482 const StreamView(Stream<T> stream) | 1483 const StreamView(Stream<T> stream) |
(...skipping 306 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
1789 } | 1790 } |
1790 | 1791 |
1791 void addError(error, [StackTrace stackTrace]) { | 1792 void addError(error, [StackTrace stackTrace]) { |
1792 _sink.addError(error, stackTrace); | 1793 _sink.addError(error, stackTrace); |
1793 } | 1794 } |
1794 | 1795 |
1795 void close() { | 1796 void close() { |
1796 _sink.close(); | 1797 _sink.close(); |
1797 } | 1798 } |
1798 } | 1799 } |
OLD | NEW |