Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(689)

Side by Side Diff: sdk/lib/async/stream.dart

Issue 2762953003: More minor type improvements in dart:async. (Closed)
Patch Set: More fixes. Created 3 years, 9 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
« no previous file with comments | « sdk/lib/async/future_impl.dart ('k') | sdk/lib/async/stream_controller.dart » ('j') | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
1 // Copyright (c) 2013, the Dart project authors. Please see the AUTHORS file 1 // Copyright (c) 2013, the Dart project authors. Please see the AUTHORS file
2 // for details. All rights reserved. Use of this source code is governed by a 2 // for details. All rights reserved. Use of this source code is governed by a
3 // BSD-style license that can be found in the LICENSE file. 3 // BSD-style license that can be found in the LICENSE file.
4 4
5 part of dart.async; 5 part of dart.async;
6 6
7 // ------------------------------------------------------------------- 7 // -------------------------------------------------------------------
8 // Core Stream types 8 // Core Stream types
9 // ------------------------------------------------------------------- 9 // -------------------------------------------------------------------
10 10
(...skipping 265 matching lines...) Expand 10 before | Expand all | Expand 10 after
276 * stringStream.transform(new DuplicationTransformer()); 276 * stringStream.transform(new DuplicationTransformer());
277 * 277 *
278 * The resulting stream is a broadcast stream if [source] is. 278 * The resulting stream is a broadcast stream if [source] is.
279 */ 279 */
280 factory Stream.eventTransformed( 280 factory Stream.eventTransformed(
281 Stream source, EventSink mapSink(EventSink<T> sink)) { 281 Stream source, EventSink mapSink(EventSink<T> sink)) {
282 return new _BoundSinkStream(source, mapSink); 282 return new _BoundSinkStream(source, mapSink);
283 } 283 }
284 284
285 /** 285 /**
286 * Reports whether this stream is a broadcast stream. 286 * Whether this stream is a broadcast stream.
287 */ 287 */
288 bool get isBroadcast => false; 288 bool get isBroadcast => false;
289 289
290 /** 290 /**
291 * Returns a multi-subscription stream that produces the same events as this. 291 * Returns a multi-subscription stream that produces the same events as this.
292 * 292 *
293 * The returned stream will subscribe to this stream when its first 293 * The returned stream will subscribe to this stream when its first
294 * subscriber is added, and will stay subscribed until this stream ends, 294 * subscriber is added, and will stay subscribed until this stream ends,
295 * or a callback cancels the subscription. 295 * or a callback cancels the subscription.
296 * 296 *
(...skipping 92 matching lines...) Expand 10 before | Expand all | Expand 10 after
389 /** 389 /**
390 * Creates a new stream with each data event of this stream asynchronously 390 * Creates a new stream with each data event of this stream asynchronously
391 * mapped to a new event. 391 * mapped to a new event.
392 * 392 *
393 * This acts like [map], except that [convert] may return a [Future], 393 * This acts like [map], except that [convert] may return a [Future],
394 * and in that case, the stream waits for that future to complete before 394 * and in that case, the stream waits for that future to complete before
395 * continuing with its result. 395 * continuing with its result.
396 * 396 *
397 * The returned stream is a broadcast stream if this stream is. 397 * The returned stream is a broadcast stream if this stream is.
398 */ 398 */
399 Stream<E> asyncMap<E>(convert(T event)) { 399 Stream<E> asyncMap<E>(FutureOr<E> convert(T event)) {
400 StreamController<E> controller; 400 StreamController<E> controller;
401 StreamSubscription<T> subscription; 401 StreamSubscription<T> subscription;
402 402
403 void onListen() { 403 void onListen() {
404 final add = controller.add; 404 final add = controller.add;
405 assert(controller is _StreamController || 405 assert(controller is _StreamController ||
406 controller is _BroadcastStreamController); 406 controller is _BroadcastStreamController);
407 final _EventSink<E> eventSink = controller as Object/*=_EventSink<E>*/; 407 final _EventSink<E> eventSink = controller as Object/*=_EventSink<E>*/;
408 final addError = eventSink._addError; 408 final addError = eventSink._addError;
409 subscription = this.listen((T event) { 409 subscription = this.listen((T event) {
410 dynamic newValue; 410 FutureOr<E> newValue;
411 try { 411 try {
412 newValue = convert(event); 412 newValue = convert(event);
413 } catch (e, s) { 413 } catch (e, s) {
414 controller.addError(e, s); 414 controller.addError(e, s);
415 return; 415 return;
416 } 416 }
417 if (newValue is Future) { 417 if (newValue is Future<E>) {
418 subscription.pause(); 418 subscription.pause();
419 newValue 419 newValue
420 .then(add, onError: addError) 420 .then(add, onError: addError)
421 .whenComplete(subscription.resume); 421 .whenComplete(subscription.resume);
422 } else { 422 } else {
423 controller.add(newValue as Object/*=E*/); 423 controller.add(newValue as Object/*=E*/);
424 } 424 }
425 }, onError: addError, onDone: controller.close); 425 }, onError: addError, onDone: controller.close);
426 } 426 }
427 427
(...skipping 275 matching lines...) Expand 10 before | Expand all | Expand 10 after
703 * 703 *
704 * Completes the returned [Future] when all events of the stream 704 * Completes the returned [Future] when all events of the stream
705 * have been processed. Completes the future with an error if the 705 * have been processed. Completes the future with an error if the
706 * stream has an error event, or if [action] throws. 706 * stream has an error event, or if [action] throws.
707 */ 707 */
708 Future forEach(void action(T element)) { 708 Future forEach(void action(T element)) {
709 _Future future = new _Future(); 709 _Future future = new _Future();
710 StreamSubscription subscription; 710 StreamSubscription subscription;
711 subscription = this.listen( 711 subscription = this.listen(
712 (T element) { 712 (T element) {
713 _runUserCode(() => action(element), (_) {}, 713 // TODO(floitsch): the type should be 'void' and inferred.
714 _runUserCode<dynamic>(() => action(element), (_) {},
714 _cancelAndErrorClosure(subscription, future)); 715 _cancelAndErrorClosure(subscription, future));
715 }, 716 },
716 onError: future._completeError, 717 onError: future._completeError,
717 onDone: () { 718 onDone: () {
718 future._complete(null); 719 future._complete(null);
719 }, 720 },
720 cancelOnError: true); 721 cancelOnError: true);
721 return future; 722 return future;
722 } 723 }
723 724
(...skipping 438 matching lines...) Expand 10 before | Expand all | Expand 10 after
1162 _completeWithErrorCallback(future, e, s); 1163 _completeWithErrorCallback(future, e, s);
1163 } 1164 }
1164 }, 1165 },
1165 cancelOnError: true); 1166 cancelOnError: true);
1166 return future; 1167 return future;
1167 } 1168 }
1168 1169
1169 /** 1170 /**
1170 * Finds the single element in this stream matching [test]. 1171 * Finds the single element in this stream matching [test].
1171 * 1172 *
1172 * Like [lastMatch], except that it is an error if more than one 1173 * Like [lastWhere], except that it is an error if more than one
1173 * matching element occurs in the stream. 1174 * matching element occurs in the stream.
1174 */ 1175 */
1175 Future<T> singleWhere(bool test(T element)) { 1176 Future<T> singleWhere(bool test(T element)) {
1176 _Future<T> future = new _Future<T>(); 1177 _Future<T> future = new _Future<T>();
1177 T result = null; 1178 T result = null;
1178 bool foundResult = false; 1179 bool foundResult = false;
1179 StreamSubscription subscription; 1180 StreamSubscription subscription;
1180 subscription = this.listen( 1181 subscription = this.listen(
1181 (T value) { 1182 (T value) {
1182 _runUserCode(() => true == test(value), (bool isMatch) { 1183 _runUserCode(() => true == test(value), (bool isMatch) {
(...skipping 279 matching lines...) Expand 10 before | Expand all | Expand 10 after
1462 } 1463 }
1463 1464
1464 /** 1465 /**
1465 * An interface that abstracts creation or handling of [Stream] events. 1466 * An interface that abstracts creation or handling of [Stream] events.
1466 */ 1467 */
1467 abstract class EventSink<T> implements Sink<T> { 1468 abstract class EventSink<T> implements Sink<T> {
1468 /** Send a data event to a stream. */ 1469 /** Send a data event to a stream. */
1469 void add(T event); 1470 void add(T event);
1470 1471
1471 /** Send an async error to a stream. */ 1472 /** Send an async error to a stream. */
1472 void addError(errorEvent, [StackTrace stackTrace]); 1473 void addError(Object errorEvent, [StackTrace stackTrace]);
1473 1474
1474 /** Close the sink. No further events can be added after closing. */ 1475 /** Close the sink. No further events can be added after closing. */
1475 void close(); 1476 void close();
1476 } 1477 }
1477 1478
1478 /** [Stream] wrapper that only exposes the [Stream] interface. */ 1479 /** [Stream] wrapper that only exposes the [Stream] interface. */
1479 class StreamView<T> extends Stream<T> { 1480 class StreamView<T> extends Stream<T> {
1480 final Stream<T> _stream; 1481 final Stream<T> _stream;
1481 1482
1482 const StreamView(Stream<T> stream) 1483 const StreamView(Stream<T> stream)
(...skipping 306 matching lines...) Expand 10 before | Expand all | Expand 10 after
1789 } 1790 }
1790 1791
1791 void addError(error, [StackTrace stackTrace]) { 1792 void addError(error, [StackTrace stackTrace]) {
1792 _sink.addError(error, stackTrace); 1793 _sink.addError(error, stackTrace);
1793 } 1794 }
1794 1795
1795 void close() { 1796 void close() {
1796 _sink.close(); 1797 _sink.close();
1797 } 1798 }
1798 } 1799 }
OLDNEW
« no previous file with comments | « sdk/lib/async/future_impl.dart ('k') | sdk/lib/async/stream_controller.dart » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698