Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(515)

Side by Side Diff: sdk/lib/async/stream.dart

Issue 2754013002: Format all dart: library files (Closed)
Patch Set: Format all dart: library files Created 3 years, 9 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
« no previous file with comments | « sdk/lib/async/schedule_microtask.dart ('k') | sdk/lib/async/stream_controller.dart » ('j') | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
1 // Copyright (c) 2013, the Dart project authors. Please see the AUTHORS file 1 // Copyright (c) 2013, the Dart project authors. Please see the AUTHORS file
2 // for details. All rights reserved. Use of this source code is governed by a 2 // for details. All rights reserved. Use of this source code is governed by a
3 // BSD-style license that can be found in the LICENSE file. 3 // BSD-style license that can be found in the LICENSE file.
4 4
5 part of dart.async; 5 part of dart.async;
6 6
7 // ------------------------------------------------------------------- 7 // -------------------------------------------------------------------
8 // Core Stream types 8 // Core Stream types
9 // ------------------------------------------------------------------- 9 // -------------------------------------------------------------------
10 10
(...skipping 86 matching lines...) Expand 10 before | Expand all | Expand 10 after
97 * 97 *
98 * When the future completes, the stream will fire one event, either 98 * When the future completes, the stream will fire one event, either
99 * data or error, and then close with a done-event. 99 * data or error, and then close with a done-event.
100 */ 100 */
101 factory Stream.fromFuture(Future<T> future) { 101 factory Stream.fromFuture(Future<T> future) {
102 // Use the controller's buffering to fill in the value even before 102 // Use the controller's buffering to fill in the value even before
103 // the stream has a listener. For a single value, it's not worth it 103 // the stream has a listener. For a single value, it's not worth it
104 // to wait for a listener before doing the `then` on the future. 104 // to wait for a listener before doing the `then` on the future.
105 _StreamController<T> controller = new StreamController<T>(sync: true); 105 _StreamController<T> controller = new StreamController<T>(sync: true);
106 future.then((value) { 106 future.then((value) {
107 controller._add(value); 107 controller._add(value);
108 controller._closeUnchecked(); 108 controller._closeUnchecked();
109 }, 109 }, onError: (error, stackTrace) {
110 onError: (error, stackTrace) { 110 controller._addError(error, stackTrace);
111 controller._addError(error, stackTrace); 111 controller._closeUnchecked();
112 controller._closeUnchecked(); 112 });
113 });
114 return controller.stream; 113 return controller.stream;
115 } 114 }
116 115
117 /** 116 /**
118 * Create a stream from a group of futures. 117 * Create a stream from a group of futures.
119 * 118 *
120 * The stream reports the results of the futures on the stream in the order 119 * The stream reports the results of the futures on the stream in the order
121 * in which the futures complete. 120 * in which the futures complete.
122 * 121 *
123 * If some futures have completed before calling `Stream.fromFutures`, 122 * If some futures have completed before calling `Stream.fromFutures`,
(...skipping 50 matching lines...) Expand 10 before | Expand all | Expand 10 after
174 /** 173 /**
175 * Creates a stream that repeatedly emits events at [period] intervals. 174 * Creates a stream that repeatedly emits events at [period] intervals.
176 * 175 *
177 * The event values are computed by invoking [computation]. The argument to 176 * The event values are computed by invoking [computation]. The argument to
178 * this callback is an integer that starts with 0 and is incremented for 177 * this callback is an integer that starts with 0 and is incremented for
179 * every event. 178 * every event.
180 * 179 *
181 * If [computation] is omitted the event values will all be `null`. 180 * If [computation] is omitted the event values will all be `null`.
182 */ 181 */
183 factory Stream.periodic(Duration period, 182 factory Stream.periodic(Duration period,
184 [T computation(int computationCount)]) { 183 [T computation(int computationCount)]) {
185 Timer timer; 184 Timer timer;
186 int computationCount = 0; 185 int computationCount = 0;
187 StreamController<T> controller; 186 StreamController<T> controller;
188 // Counts the time that the Stream was running (and not paused). 187 // Counts the time that the Stream was running (and not paused).
189 Stopwatch watch = new Stopwatch(); 188 Stopwatch watch = new Stopwatch();
190 189
191 void sendEvent() { 190 void sendEvent() {
192 watch.reset(); 191 watch.reset();
193 T data; 192 T data;
194 if (computation != null) { 193 if (computation != null) {
195 try { 194 try {
196 data = computation(computationCount++); 195 data = computation(computationCount++);
197 } catch (e, s) { 196 } catch (e, s) {
198 controller.addError(e, s); 197 controller.addError(e, s);
199 return; 198 return;
200 } 199 }
201 } 200 }
202 controller.add(data); 201 controller.add(data);
203 } 202 }
204 203
205 void startPeriodicTimer() { 204 void startPeriodicTimer() {
206 assert(timer == null); 205 assert(timer == null);
207 timer = new Timer.periodic(period, (Timer timer) { 206 timer = new Timer.periodic(period, (Timer timer) {
208 sendEvent(); 207 sendEvent();
209 }); 208 });
210 } 209 }
211 210
212 controller = new StreamController<T>(sync: true, 211 controller = new StreamController<T>(
212 sync: true,
213 onListen: () { 213 onListen: () {
214 watch.start(); 214 watch.start();
215 startPeriodicTimer(); 215 startPeriodicTimer();
216 }, 216 },
217 onPause: () { 217 onPause: () {
218 timer.cancel(); 218 timer.cancel();
219 timer = null; 219 timer = null;
220 watch.stop(); 220 watch.stop();
221 }, 221 },
222 onResume: () { 222 onResume: () {
(...skipping 47 matching lines...) Expand 10 before | Expand all | Expand 10 after
270 * // Some generic types ommitted for brevety. 270 * // Some generic types ommitted for brevety.
271 * Stream bind(Stream stream) => new Stream<String>.eventTransformed( 271 * Stream bind(Stream stream) => new Stream<String>.eventTransformed(
272 * stream, 272 * stream,
273 * (EventSink sink) => new DuplicationSink(sink)); 273 * (EventSink sink) => new DuplicationSink(sink));
274 * } 274 * }
275 * 275 *
276 * stringStream.transform(new DuplicationTransformer()); 276 * stringStream.transform(new DuplicationTransformer());
277 * 277 *
278 * The resulting stream is a broadcast stream if [source] is. 278 * The resulting stream is a broadcast stream if [source] is.
279 */ 279 */
280 factory Stream.eventTransformed(Stream source, 280 factory Stream.eventTransformed(
281 EventSink mapSink(EventSink<T> sink)) { 281 Stream source, EventSink mapSink(EventSink<T> sink)) {
282 return new _BoundSinkStream(source, mapSink); 282 return new _BoundSinkStream(source, mapSink);
283 } 283 }
284 284
285 /** 285 /**
286 * Reports whether this stream is a broadcast stream. 286 * Reports whether this stream is a broadcast stream.
287 */ 287 */
288 bool get isBroadcast => false; 288 bool get isBroadcast => false;
289 289
290 /** 290 /**
291 * Returns a multi-subscription stream that produces the same events as this. 291 * Returns a multi-subscription stream that produces the same events as this.
292 * 292 *
293 * The returned stream will subscribe to this stream when its first 293 * The returned stream will subscribe to this stream when its first
294 * subscriber is added, and will stay subscribed until this stream ends, 294 * subscriber is added, and will stay subscribed until this stream ends,
295 * or a callback cancels the subscription. 295 * or a callback cancels the subscription.
296 * 296 *
297 * If [onListen] is provided, it is called with a subscription-like object 297 * If [onListen] is provided, it is called with a subscription-like object
298 * that represents the underlying subscription to this stream. It is 298 * that represents the underlying subscription to this stream. It is
299 * possible to pause, resume or cancel the subscription during the call 299 * possible to pause, resume or cancel the subscription during the call
300 * to [onListen]. It is not possible to change the event handlers, including 300 * to [onListen]. It is not possible to change the event handlers, including
301 * using [StreamSubscription.asFuture]. 301 * using [StreamSubscription.asFuture].
302 * 302 *
303 * If [onCancel] is provided, it is called in a similar way to [onListen] 303 * If [onCancel] is provided, it is called in a similar way to [onListen]
304 * when the returned stream stops having listener. If it later gets 304 * when the returned stream stops having listener. If it later gets
305 * a new listener, the [onListen] function is called again. 305 * a new listener, the [onListen] function is called again.
306 * 306 *
307 * Use the callbacks, for example, for pausing the underlying subscription 307 * Use the callbacks, for example, for pausing the underlying subscription
308 * while having no subscribers to prevent losing events, or canceling the 308 * while having no subscribers to prevent losing events, or canceling the
309 * subscription when there are no listeners. 309 * subscription when there are no listeners.
310 */ 310 */
311 Stream<T> asBroadcastStream({ 311 Stream<T> asBroadcastStream(
312 void onListen(StreamSubscription<T> subscription), 312 {void onListen(StreamSubscription<T> subscription),
313 void onCancel(StreamSubscription<T> subscription) }) { 313 void onCancel(StreamSubscription<T> subscription)}) {
314 return new _AsBroadcastStream<T>(this, onListen, onCancel); 314 return new _AsBroadcastStream<T>(this, onListen, onCancel);
315 } 315 }
316 316
317 /** 317 /**
318 * Adds a subscription to this stream. 318 * Adds a subscription to this stream.
319 * 319 *
320 * Returns a [StreamSubscription] which handles events from the stream using 320 * Returns a [StreamSubscription] which handles events from the stream using
321 * the provided [onData], [onError] and [onDone] handlers. 321 * the provided [onData], [onError] and [onDone] handlers.
322 * The handlers can be changed on the subscription, but they start out 322 * The handlers can be changed on the subscription, but they start out
323 * as the provided functions. 323 * as the provided functions.
(...skipping 19 matching lines...) Expand all
343 * called. If [onDone] is `null`, nothing happens. 343 * called. If [onDone] is `null`, nothing happens.
344 * 344 *
345 * If [cancelOnError] is true, the subscription is automatically cancelled 345 * If [cancelOnError] is true, the subscription is automatically cancelled
346 * when the first error event is delivered. The default is `false`. 346 * when the first error event is delivered. The default is `false`.
347 * 347 *
348 * While a subscription is paused, or when it has been cancelled, 348 * While a subscription is paused, or when it has been cancelled,
349 * the subscription doesn't receive events and none of the 349 * the subscription doesn't receive events and none of the
350 * event handler functions are called. 350 * event handler functions are called.
351 */ 351 */
352 StreamSubscription<T> listen(void onData(T event), 352 StreamSubscription<T> listen(void onData(T event),
353 { Function onError, 353 {Function onError, void onDone(), bool cancelOnError});
354 void onDone(),
355 bool cancelOnError});
356 354
357 /** 355 /**
358 * Creates a new stream from this stream that discards some data events. 356 * Creates a new stream from this stream that discards some data events.
359 * 357 *
360 * The new stream sends the same error and done events as this stream, 358 * The new stream sends the same error and done events as this stream,
361 * but it only sends the data events that satisfy the [test]. 359 * but it only sends the data events that satisfy the [test].
362 * 360 *
363 * The returned stream is a broadcast stream if this stream is. 361 * The returned stream is a broadcast stream if this stream is.
364 * If a broadcast stream is listened to more than once, each subscription 362 * If a broadcast stream is listened to more than once, each subscription
365 * will individually perform the `test`. 363 * will individually perform the `test`.
(...skipping 32 matching lines...) Expand 10 before | Expand all | Expand 10 after
398 * 396 *
399 * The returned stream is a broadcast stream if this stream is. 397 * The returned stream is a broadcast stream if this stream is.
400 */ 398 */
401 Stream<E> asyncMap<E>(convert(T event)) { 399 Stream<E> asyncMap<E>(convert(T event)) {
402 StreamController<E> controller; 400 StreamController<E> controller;
403 StreamSubscription<T> subscription; 401 StreamSubscription<T> subscription;
404 402
405 void onListen() { 403 void onListen() {
406 final add = controller.add; 404 final add = controller.add;
407 assert(controller is _StreamController || 405 assert(controller is _StreamController ||
408 controller is _BroadcastStreamController); 406 controller is _BroadcastStreamController);
409 final _EventSink<E> eventSink = 407 final _EventSink<E> eventSink = controller as Object/*=_EventSink<E>*/;
410 controller as Object /*=_EventSink<E>*/;
411 final addError = eventSink._addError; 408 final addError = eventSink._addError;
412 subscription = this.listen( 409 subscription = this.listen((T event) {
413 (T event) { 410 dynamic newValue;
414 dynamic newValue; 411 try {
415 try { 412 newValue = convert(event);
416 newValue = convert(event); 413 } catch (e, s) {
417 } catch (e, s) { 414 controller.addError(e, s);
418 controller.addError(e, s); 415 return;
419 return; 416 }
420 } 417 if (newValue is Future) {
421 if (newValue is Future) { 418 subscription.pause();
422 subscription.pause(); 419 newValue
423 newValue.then(add, onError: addError) 420 .then(add, onError: addError)
424 .whenComplete(subscription.resume); 421 .whenComplete(subscription.resume);
425 } else { 422 } else {
426 controller.add(newValue as Object/*=E*/); 423 controller.add(newValue as Object/*=E*/);
427 } 424 }
428 }, 425 }, onError: addError, onDone: controller.close);
429 onError: addError,
430 onDone: controller.close
431 );
432 } 426 }
433 427
434 if (this.isBroadcast) { 428 if (this.isBroadcast) {
435 controller = new StreamController<E>.broadcast( 429 controller = new StreamController<E>.broadcast(
436 onListen: onListen, 430 onListen: onListen,
437 onCancel: () { subscription.cancel(); }, 431 onCancel: () {
438 sync: true 432 subscription.cancel();
439 ); 433 },
434 sync: true);
440 } else { 435 } else {
441 controller = new StreamController<E>( 436 controller = new StreamController<E>(
442 onListen: onListen, 437 onListen: onListen,
443 onPause: () { subscription.pause(); }, 438 onPause: () {
444 onResume: () { subscription.resume(); }, 439 subscription.pause();
445 onCancel: () => subscription.cancel(), 440 },
446 sync: true 441 onResume: () {
447 ); 442 subscription.resume();
443 },
444 onCancel: () => subscription.cancel(),
445 sync: true);
448 } 446 }
449 return controller.stream; 447 return controller.stream;
450 } 448 }
451 449
452 /** 450 /**
453 * Creates a new stream with the events of a stream per original event. 451 * Creates a new stream with the events of a stream per original event.
454 * 452 *
455 * This acts like [expand], except that [convert] returns a [Stream] 453 * This acts like [expand], except that [convert] returns a [Stream]
456 * instead of an [Iterable]. 454 * instead of an [Iterable].
457 * The events of the returned stream becomes the events of the returned 455 * The events of the returned stream becomes the events of the returned
458 * stream, in the order they are produced. 456 * stream, in the order they are produced.
459 * 457 *
460 * If [convert] returns `null`, no value is put on the output stream, 458 * If [convert] returns `null`, no value is put on the output stream,
461 * just as if it returned an empty stream. 459 * just as if it returned an empty stream.
462 * 460 *
463 * The returned stream is a broadcast stream if this stream is. 461 * The returned stream is a broadcast stream if this stream is.
464 */ 462 */
465 Stream<E> asyncExpand<E>(Stream<E> convert(T event)) { 463 Stream<E> asyncExpand<E>(Stream<E> convert(T event)) {
466 StreamController<E> controller; 464 StreamController<E> controller;
467 StreamSubscription<T> subscription; 465 StreamSubscription<T> subscription;
468 void onListen() { 466 void onListen() {
469 assert(controller is _StreamController || 467 assert(controller is _StreamController ||
470 controller is _BroadcastStreamController); 468 controller is _BroadcastStreamController);
471 final _EventSink<E> eventSink = 469 final _EventSink<E> eventSink = controller as Object/*=_EventSink<E>*/;
472 controller as Object /*=_EventSink<E>*/; 470 subscription = this.listen((T event) {
473 subscription = this.listen( 471 Stream<E> newStream;
474 (T event) { 472 try {
475 Stream<E> newStream; 473 newStream = convert(event);
476 try { 474 } catch (e, s) {
477 newStream = convert(event); 475 controller.addError(e, s);
478 } catch (e, s) { 476 return;
479 controller.addError(e, s); 477 }
480 return; 478 if (newStream != null) {
481 } 479 subscription.pause();
482 if (newStream != null) { 480 controller.addStream(newStream).whenComplete(subscription.resume);
483 subscription.pause(); 481 }
484 controller.addStream(newStream) 482 },
485 .whenComplete(subscription.resume); 483 onError: eventSink._addError, // Avoid Zone error replacement.
486 } 484 onDone: controller.close);
487 },
488 onError: eventSink._addError, // Avoid Zone error replacement.
489 onDone: controller.close
490 );
491 } 485 }
486
492 if (this.isBroadcast) { 487 if (this.isBroadcast) {
493 controller = new StreamController<E>.broadcast( 488 controller = new StreamController<E>.broadcast(
494 onListen: onListen, 489 onListen: onListen,
495 onCancel: () { subscription.cancel(); }, 490 onCancel: () {
496 sync: true 491 subscription.cancel();
497 ); 492 },
493 sync: true);
498 } else { 494 } else {
499 controller = new StreamController<E>( 495 controller = new StreamController<E>(
500 onListen: onListen, 496 onListen: onListen,
501 onPause: () { subscription.pause(); }, 497 onPause: () {
502 onResume: () { subscription.resume(); }, 498 subscription.pause();
503 onCancel: () => subscription.cancel(), 499 },
504 sync: true 500 onResume: () {
505 ); 501 subscription.resume();
502 },
503 onCancel: () => subscription.cancel(),
504 sync: true);
506 } 505 }
507 return controller.stream; 506 return controller.stream;
508 } 507 }
509 508
510 /** 509 /**
511 * Creates a wrapper Stream that intercepts some errors from this stream. 510 * Creates a wrapper Stream that intercepts some errors from this stream.
512 * 511 *
513 * If this stream sends an error that matches [test], then it is intercepted 512 * If this stream sends an error that matches [test], then it is intercepted
514 * by the [onError] function. 513 * by the [onError] function.
515 * 514 *
(...skipping 12 matching lines...) Expand all
528 * or simply return to make the stream forget the error. 527 * or simply return to make the stream forget the error.
529 * 528 *
530 * If you need to transform an error into a data event, use the more generic 529 * If you need to transform an error into a data event, use the more generic
531 * [Stream.transform] to handle the event by writing a data event to 530 * [Stream.transform] to handle the event by writing a data event to
532 * the output sink. 531 * the output sink.
533 * 532 *
534 * The returned stream is a broadcast stream if this stream is. 533 * The returned stream is a broadcast stream if this stream is.
535 * If a broadcast stream is listened to more than once, each subscription 534 * If a broadcast stream is listened to more than once, each subscription
536 * will individually perform the `test` and handle the error. 535 * will individually perform the `test` and handle the error.
537 */ 536 */
538 Stream<T> handleError(Function onError, { bool test(error) }) { 537 Stream<T> handleError(Function onError, {bool test(error)}) {
539 return new _HandleErrorStream<T>(this, onError, test); 538 return new _HandleErrorStream<T>(this, onError, test);
540 } 539 }
541 540
542 /** 541 /**
543 * Creates a new stream from this stream that converts each element 542 * Creates a new stream from this stream that converts each element
544 * into zero or more events. 543 * into zero or more events.
545 * 544 *
546 * Each incoming event is converted to an [Iterable] of new events, 545 * Each incoming event is converted to an [Iterable] of new events,
547 * and each of these new events are then sent by the returned stream 546 * and each of these new events are then sent by the returned stream
548 * in order. 547 * in order.
(...skipping 29 matching lines...) Expand all
578 } 577 }
579 578
580 /** 579 /**
581 * Chains this stream as the input of the provided [StreamTransformer]. 580 * Chains this stream as the input of the provided [StreamTransformer].
582 * 581 *
583 * Returns the result of [:streamTransformer.bind:] itself. 582 * Returns the result of [:streamTransformer.bind:] itself.
584 * 583 *
585 * The `streamTransformer` can decide whether it wants to return a 584 * The `streamTransformer` can decide whether it wants to return a
586 * broadcast stream or not. 585 * broadcast stream or not.
587 */ 586 */
588 Stream<S> transform<S>( 587 Stream<S> transform<S>(StreamTransformer<T, S> streamTransformer) {
589 StreamTransformer<T, S > streamTransformer) {
590 return streamTransformer.bind(this); 588 return streamTransformer.bind(this);
591 } 589 }
592 590
593 /** 591 /**
594 * Reduces a sequence of values by repeatedly applying [combine]. 592 * Reduces a sequence of values by repeatedly applying [combine].
595 */ 593 */
596 Future<T> reduce(T combine(T previous, T element)) { 594 Future<T> reduce(T combine(T previous, T element)) {
597 _Future<T> result = new _Future<T>(); 595 _Future<T> result = new _Future<T>();
598 bool seenFirst = false; 596 bool seenFirst = false;
599 T value; 597 T value;
600 StreamSubscription subscription; 598 StreamSubscription subscription;
601 subscription = this.listen( 599 subscription = this.listen(
602 (T element) { 600 (T element) {
603 if (seenFirst) { 601 if (seenFirst) {
604 _runUserCode(() => combine(value, element), 602 _runUserCode(() => combine(value, element), (T newValue) {
605 (T newValue) { value = newValue; }, 603 value = newValue;
606 _cancelAndErrorClosure(subscription, result)); 604 }, _cancelAndErrorClosure(subscription, result));
607 } else { 605 } else {
608 value = element; 606 value = element;
609 seenFirst = true; 607 seenFirst = true;
610 }
611 },
612 onError: result._completeError,
613 onDone: () {
614 if (!seenFirst) {
615 try {
616 throw IterableElementError.noElement();
617 } catch (e, s) {
618 _completeWithErrorCallback(result, e, s);
619 } 608 }
620 } else { 609 },
621 result._complete(value); 610 onError: result._completeError,
622 } 611 onDone: () {
623 }, 612 if (!seenFirst) {
624 cancelOnError: true 613 try {
625 ); 614 throw IterableElementError.noElement();
615 } catch (e, s) {
616 _completeWithErrorCallback(result, e, s);
617 }
618 } else {
619 result._complete(value);
620 }
621 },
622 cancelOnError: true);
626 return result; 623 return result;
627 } 624 }
628 625
629 /** Reduces a sequence of values by repeatedly applying [combine]. */ 626 /** Reduces a sequence of values by repeatedly applying [combine]. */
630 Future<S> fold<S>(S initialValue, 627 Future<S> fold<S>(S initialValue, S combine(S previous, T element)) {
631 S combine(S previous, T element)) {
632
633 _Future<S> result = new _Future<S>(); 628 _Future<S> result = new _Future<S>();
634 S value = initialValue; 629 S value = initialValue;
635 StreamSubscription subscription; 630 StreamSubscription subscription;
636 subscription = this.listen( 631 subscription = this.listen((T element) {
637 (T element) { 632 _runUserCode(() => combine(value, element), (S newValue) {
638 _runUserCode( 633 value = newValue;
639 () => combine(value, element), 634 }, _cancelAndErrorClosure(subscription, result));
640 (S newValue) { value = newValue; }, 635 }, onError: (e, st) {
641 _cancelAndErrorClosure(subscription, result) 636 result._completeError(e, st);
642 ); 637 }, onDone: () {
643 }, 638 result._complete(value);
644 onError: (e, st) { 639 }, cancelOnError: true);
645 result._completeError(e, st);
646 },
647 onDone: () {
648 result._complete(value);
649 },
650 cancelOnError: true);
651 return result; 640 return result;
652 } 641 }
653 642
654 /** 643 /**
655 * Collects string of data events' string representations. 644 * Collects string of data events' string representations.
656 * 645 *
657 * If [separator] is provided, it is inserted between any two 646 * If [separator] is provided, it is inserted between any two
658 * elements. 647 * elements.
659 * 648 *
660 * Any error in the stream causes the future to complete with that 649 * Any error in the stream causes the future to complete with that
661 * error. Otherwise it completes with the collected string when 650 * error. Otherwise it completes with the collected string when
662 * the "done" event arrives. 651 * the "done" event arrives.
663 */ 652 */
664 Future<String> join([String separator = ""]) { 653 Future<String> join([String separator = ""]) {
665 _Future<String> result = new _Future<String>(); 654 _Future<String> result = new _Future<String>();
666 StringBuffer buffer = new StringBuffer(); 655 StringBuffer buffer = new StringBuffer();
667 StreamSubscription subscription; 656 StreamSubscription subscription;
668 bool first = true; 657 bool first = true;
669 subscription = this.listen( 658 subscription = this.listen((T element) {
670 (T element) { 659 if (!first) {
671 if (!first) { 660 buffer.write(separator);
672 buffer.write(separator); 661 }
673 } 662 first = false;
674 first = false; 663 try {
675 try { 664 buffer.write(element);
676 buffer.write(element); 665 } catch (e, s) {
677 } catch (e, s) { 666 _cancelAndErrorWithReplacement(subscription, result, e, s);
678 _cancelAndErrorWithReplacement(subscription, result, e, s); 667 }
679 } 668 }, onError: (e) {
680 }, 669 result._completeError(e);
681 onError: (e) { 670 }, onDone: () {
682 result._completeError(e); 671 result._complete(buffer.toString());
683 }, 672 }, cancelOnError: true);
684 onDone: () {
685 result._complete(buffer.toString());
686 },
687 cancelOnError: true);
688 return result; 673 return result;
689 } 674 }
690 675
691 /** 676 /**
692 * Checks whether [needle] occurs in the elements provided by this stream. 677 * Checks whether [needle] occurs in the elements provided by this stream.
693 * 678 *
694 * Completes the [Future] when the answer is known. 679 * Completes the [Future] when the answer is known.
695 * If this stream reports an error, the [Future] will report that error. 680 * If this stream reports an error, the [Future] will report that error.
696 */ 681 */
697 Future<bool> contains(Object needle) { 682 Future<bool> contains(Object needle) {
698 _Future<bool> future = new _Future<bool>(); 683 _Future<bool> future = new _Future<bool>();
699 StreamSubscription subscription; 684 StreamSubscription subscription;
700 subscription = this.listen( 685 subscription = this.listen(
701 (T element) { 686 (T element) {
702 _runUserCode( 687 _runUserCode(() => (element == needle), (bool isMatch) {
703 () => (element == needle), 688 if (isMatch) {
704 (bool isMatch) { 689 _cancelAndValue(subscription, future, true);
705 if (isMatch) { 690 }
706 _cancelAndValue(subscription, future, true); 691 }, _cancelAndErrorClosure(subscription, future));
707 }
708 },
709 _cancelAndErrorClosure(subscription, future)
710 );
711 }, 692 },
712 onError: future._completeError, 693 onError: future._completeError,
713 onDone: () { 694 onDone: () {
714 future._complete(false); 695 future._complete(false);
715 }, 696 },
716 cancelOnError: true); 697 cancelOnError: true);
717 return future; 698 return future;
718 } 699 }
719 700
720 /** 701 /**
721 * Executes [action] on each data event of the stream. 702 * Executes [action] on each data event of the stream.
722 * 703 *
723 * Completes the returned [Future] when all events of the stream 704 * Completes the returned [Future] when all events of the stream
724 * have been processed. Completes the future with an error if the 705 * have been processed. Completes the future with an error if the
725 * stream has an error event, or if [action] throws. 706 * stream has an error event, or if [action] throws.
726 */ 707 */
727 Future forEach(void action(T element)) { 708 Future forEach(void action(T element)) {
728 _Future future = new _Future(); 709 _Future future = new _Future();
729 StreamSubscription subscription; 710 StreamSubscription subscription;
730 subscription = this.listen( 711 subscription = this.listen(
731 (T element) { 712 (T element) {
732 _runUserCode( 713 _runUserCode(() => action(element), (_) {},
733 () => action(element), 714 _cancelAndErrorClosure(subscription, future));
734 (_) {},
735 _cancelAndErrorClosure(subscription, future)
736 );
737 }, 715 },
738 onError: future._completeError, 716 onError: future._completeError,
739 onDone: () { 717 onDone: () {
740 future._complete(null); 718 future._complete(null);
741 }, 719 },
742 cancelOnError: true); 720 cancelOnError: true);
743 return future; 721 return future;
744 } 722 }
745 723
746 /** 724 /**
747 * Checks whether [test] accepts all elements provided by this stream. 725 * Checks whether [test] accepts all elements provided by this stream.
748 * 726 *
749 * Completes the [Future] when the answer is known. 727 * Completes the [Future] when the answer is known.
750 * If this stream reports an error, the [Future] will report that error. 728 * If this stream reports an error, the [Future] will report that error.
751 */ 729 */
752 Future<bool> every(bool test(T element)) { 730 Future<bool> every(bool test(T element)) {
753 _Future<bool> future = new _Future<bool>(); 731 _Future<bool> future = new _Future<bool>();
754 StreamSubscription subscription; 732 StreamSubscription subscription;
755 subscription = this.listen( 733 subscription = this.listen(
756 (T element) { 734 (T element) {
757 _runUserCode( 735 _runUserCode(() => test(element), (bool isMatch) {
758 () => test(element), 736 if (!isMatch) {
759 (bool isMatch) { 737 _cancelAndValue(subscription, future, false);
760 if (!isMatch) { 738 }
761 _cancelAndValue(subscription, future, false); 739 }, _cancelAndErrorClosure(subscription, future));
762 }
763 },
764 _cancelAndErrorClosure(subscription, future)
765 );
766 }, 740 },
767 onError: future._completeError, 741 onError: future._completeError,
768 onDone: () { 742 onDone: () {
769 future._complete(true); 743 future._complete(true);
770 }, 744 },
771 cancelOnError: true); 745 cancelOnError: true);
772 return future; 746 return future;
773 } 747 }
774 748
775 /** 749 /**
776 * Checks whether [test] accepts any element provided by this stream. 750 * Checks whether [test] accepts any element provided by this stream.
777 * 751 *
778 * Completes the [Future] when the answer is known. 752 * Completes the [Future] when the answer is known.
779 * 753 *
780 * If this stream reports an error, the [Future] reports that error. 754 * If this stream reports an error, the [Future] reports that error.
781 * 755 *
782 * Stops listening to the stream after the first matching element has been 756 * Stops listening to the stream after the first matching element has been
783 * found. 757 * found.
784 * 758 *
785 * Internally the method cancels its subscription after this element. This 759 * Internally the method cancels its subscription after this element. This
786 * means that single-subscription (non-broadcast) streams are closed and 760 * means that single-subscription (non-broadcast) streams are closed and
787 * cannot be reused after a call to this method. 761 * cannot be reused after a call to this method.
788 */ 762 */
789 Future<bool> any(bool test(T element)) { 763 Future<bool> any(bool test(T element)) {
790 _Future<bool> future = new _Future<bool>(); 764 _Future<bool> future = new _Future<bool>();
791 StreamSubscription subscription; 765 StreamSubscription subscription;
792 subscription = this.listen( 766 subscription = this.listen(
793 (T element) { 767 (T element) {
794 _runUserCode( 768 _runUserCode(() => test(element), (bool isMatch) {
795 () => test(element), 769 if (isMatch) {
796 (bool isMatch) { 770 _cancelAndValue(subscription, future, true);
797 if (isMatch) { 771 }
798 _cancelAndValue(subscription, future, true); 772 }, _cancelAndErrorClosure(subscription, future));
799 }
800 },
801 _cancelAndErrorClosure(subscription, future)
802 );
803 }, 773 },
804 onError: future._completeError, 774 onError: future._completeError,
805 onDone: () { 775 onDone: () {
806 future._complete(false); 776 future._complete(false);
807 }, 777 },
808 cancelOnError: true); 778 cancelOnError: true);
809 return future; 779 return future;
810 } 780 }
811 781
812
813 /** Counts the elements in the stream. */ 782 /** Counts the elements in the stream. */
814 Future<int> get length { 783 Future<int> get length {
815 _Future<int> future = new _Future<int>(); 784 _Future<int> future = new _Future<int>();
816 int count = 0; 785 int count = 0;
817 this.listen( 786 this.listen(
818 (_) { count++; }, 787 (_) {
819 onError: future._completeError, 788 count++;
820 onDone: () { 789 },
821 future._complete(count); 790 onError: future._completeError,
822 }, 791 onDone: () {
823 cancelOnError: true); 792 future._complete(count);
793 },
794 cancelOnError: true);
824 return future; 795 return future;
825 } 796 }
826 797
827 /** 798 /**
828 * Reports whether this stream contains any elements. 799 * Reports whether this stream contains any elements.
829 * 800 *
830 * Stops listening to the stream after the first element has been received. 801 * Stops listening to the stream after the first element has been received.
831 * 802 *
832 * Internally the method cancels its subscription after the first element. 803 * Internally the method cancels its subscription after the first element.
833 * This means that single-subscription (non-broadcast) streams are closed and 804 * This means that single-subscription (non-broadcast) streams are closed and
834 * cannot be reused after a call to this getter. 805 * cannot be reused after a call to this getter.
835 */ 806 */
836 Future<bool> get isEmpty { 807 Future<bool> get isEmpty {
837 _Future<bool> future = new _Future<bool>(); 808 _Future<bool> future = new _Future<bool>();
838 StreamSubscription subscription; 809 StreamSubscription subscription;
839 subscription = this.listen( 810 subscription = this.listen(
840 (_) { 811 (_) {
841 _cancelAndValue(subscription, future, false); 812 _cancelAndValue(subscription, future, false);
842 }, 813 },
843 onError: future._completeError, 814 onError: future._completeError,
844 onDone: () { 815 onDone: () {
845 future._complete(true); 816 future._complete(true);
846 }, 817 },
847 cancelOnError: true); 818 cancelOnError: true);
848 return future; 819 return future;
849 } 820 }
850 821
851 /** Collects the data of this stream in a [List]. */ 822 /** Collects the data of this stream in a [List]. */
852 Future<List<T>> toList() { 823 Future<List<T>> toList() {
853 List<T> result = <T>[]; 824 List<T> result = <T>[];
854 _Future<List<T>> future = new _Future<List<T>>(); 825 _Future<List<T>> future = new _Future<List<T>>();
855 this.listen( 826 this.listen(
856 (T data) { 827 (T data) {
857 result.add(data); 828 result.add(data);
858 }, 829 },
859 onError: future._completeError, 830 onError: future._completeError,
860 onDone: () { 831 onDone: () {
861 future._complete(result); 832 future._complete(result);
862 }, 833 },
863 cancelOnError: true); 834 cancelOnError: true);
864 return future; 835 return future;
865 } 836 }
866 837
867 /** 838 /**
868 * Collects the data of this stream in a [Set]. 839 * Collects the data of this stream in a [Set].
869 * 840 *
870 * The returned set is the same type as returned by `new Set<T>()`. 841 * The returned set is the same type as returned by `new Set<T>()`.
871 * If another type of set is needed, either use [forEach] to add each 842 * If another type of set is needed, either use [forEach] to add each
872 * element to the set, or use 843 * element to the set, or use
873 * `toList().then((list) => new SomeOtherSet.from(list))` 844 * `toList().then((list) => new SomeOtherSet.from(list))`
874 * to create the set. 845 * to create the set.
875 */ 846 */
876 Future<Set<T>> toSet() { 847 Future<Set<T>> toSet() {
877 Set<T> result = new Set<T>(); 848 Set<T> result = new Set<T>();
878 _Future<Set<T>> future = new _Future<Set<T>>(); 849 _Future<Set<T>> future = new _Future<Set<T>>();
879 this.listen( 850 this.listen(
880 (T data) { 851 (T data) {
881 result.add(data); 852 result.add(data);
882 }, 853 },
883 onError: future._completeError, 854 onError: future._completeError,
884 onDone: () { 855 onDone: () {
885 future._complete(result); 856 future._complete(result);
886 }, 857 },
887 cancelOnError: true); 858 cancelOnError: true);
888 return future; 859 return future;
889 } 860 }
890 861
891 /** 862 /**
892 * Discards all data on the stream, but signals when it's done or an error 863 * Discards all data on the stream, but signals when it's done or an error
893 * occurred. 864 * occurred.
894 * 865 *
895 * When subscribing using [drain], cancelOnError will be true. This means 866 * When subscribing using [drain], cancelOnError will be true. This means
896 * that the future will complete with the first error on the stream and then 867 * that the future will complete with the first error on the stream and then
897 * cancel the subscription. 868 * cancel the subscription.
898 * 869 *
899 * In case of a `done` event the future completes with the given 870 * In case of a `done` event the future completes with the given
900 * [futureValue]. 871 * [futureValue].
901 */ 872 */
902 Future<E> drain<E>([E futureValue]) 873 Future<E> drain<E>([E futureValue]) => listen(null, cancelOnError: true)
903 => listen(null, cancelOnError: true).asFuture<E>(futureValue); 874 .asFuture<E>(futureValue);
904 875
905 /** 876 /**
906 * Provides at most the first [count] data events of this stream. 877 * Provides at most the first [count] data events of this stream.
907 * 878 *
908 * Forwards all events of this stream to the returned stream 879 * Forwards all events of this stream to the returned stream
909 * until [count] data events have been forwarded or this stream ends, 880 * until [count] data events have been forwarded or this stream ends,
910 * then ends the returned stream with a done event. 881 * then ends the returned stream with a done event.
911 * 882 *
912 * If this stream produces fewer than [count] data events before it's done, 883 * If this stream produces fewer than [count] data events before it's done,
913 * so will the returned stream. 884 * so will the returned stream.
(...skipping 93 matching lines...) Expand 10 before | Expand all | Expand 10 after
1007 * If this stream is empty (a done event occurs before the first data event), 978 * If this stream is empty (a done event occurs before the first data event),
1008 * the resulting future completes with a [StateError]. 979 * the resulting future completes with a [StateError].
1009 * 980 *
1010 * Except for the type of the error, this method is equivalent to 981 * Except for the type of the error, this method is equivalent to
1011 * [:this.elementAt(0):]. 982 * [:this.elementAt(0):].
1012 */ 983 */
1013 Future<T> get first { 984 Future<T> get first {
1014 _Future<T> future = new _Future<T>(); 985 _Future<T> future = new _Future<T>();
1015 StreamSubscription subscription; 986 StreamSubscription subscription;
1016 subscription = this.listen( 987 subscription = this.listen(
1017 (T value) { 988 (T value) {
1018 _cancelAndValue(subscription, future, value); 989 _cancelAndValue(subscription, future, value);
1019 }, 990 },
1020 onError: future._completeError, 991 onError: future._completeError,
1021 onDone: () { 992 onDone: () {
1022 try { 993 try {
1023 throw IterableElementError.noElement(); 994 throw IterableElementError.noElement();
1024 } catch (e, s) { 995 } catch (e, s) {
1025 _completeWithErrorCallback(future, e, s); 996 _completeWithErrorCallback(future, e, s);
1026 } 997 }
1027 }, 998 },
1028 cancelOnError: true); 999 cancelOnError: true);
1029 return future; 1000 return future;
1030 } 1001 }
1031 1002
1032 /** 1003 /**
1033 * Returns the last element of the stream. 1004 * Returns the last element of the stream.
1034 * 1005 *
1035 * If an error event occurs before the first data event, the resulting future 1006 * If an error event occurs before the first data event, the resulting future
1036 * is completed with that error. 1007 * is completed with that error.
1037 * 1008 *
1038 * If this stream is empty (a done event occurs before the first data event), 1009 * If this stream is empty (a done event occurs before the first data event),
1039 * the resulting future completes with a [StateError]. 1010 * the resulting future completes with a [StateError].
1040 */ 1011 */
1041 Future<T> get last { 1012 Future<T> get last {
1042 _Future<T> future = new _Future<T>(); 1013 _Future<T> future = new _Future<T>();
1043 T result = null; 1014 T result = null;
1044 bool foundResult = false; 1015 bool foundResult = false;
1045 listen( 1016 listen(
1046 (T value) { 1017 (T value) {
1047 foundResult = true; 1018 foundResult = true;
1048 result = value; 1019 result = value;
1049 }, 1020 },
1050 onError: future._completeError, 1021 onError: future._completeError,
1051 onDone: () { 1022 onDone: () {
1052 if (foundResult) { 1023 if (foundResult) {
1053 future._complete(result); 1024 future._complete(result);
1054 return; 1025 return;
1055 } 1026 }
1056 try { 1027 try {
1057 throw IterableElementError.noElement(); 1028 throw IterableElementError.noElement();
1058 } catch (e, s) { 1029 } catch (e, s) {
1059 _completeWithErrorCallback(future, e, s); 1030 _completeWithErrorCallback(future, e, s);
1060 } 1031 }
1061 }, 1032 },
1062 cancelOnError: true); 1033 cancelOnError: true);
1063 return future; 1034 return future;
1064 } 1035 }
1065 1036
1066 /** 1037 /**
1067 * Returns the single element. 1038 * Returns the single element.
1068 * 1039 *
1069 * If an error event occurs before or after the first data event, the 1040 * If an error event occurs before or after the first data event, the
1070 * resulting future is completed with that error. 1041 * resulting future is completed with that error.
1071 * 1042 *
1072 * If [this] is empty or has more than one element throws a [StateError]. 1043 * If [this] is empty or has more than one element throws a [StateError].
1073 */ 1044 */
1074 Future<T> get single { 1045 Future<T> get single {
1075 _Future<T> future = new _Future<T>(); 1046 _Future<T> future = new _Future<T>();
1076 T result = null; 1047 T result = null;
1077 bool foundResult = false; 1048 bool foundResult = false;
1078 StreamSubscription subscription; 1049 StreamSubscription subscription;
1079 subscription = this.listen( 1050 subscription = this.listen(
1080 (T value) { 1051 (T value) {
1081 if (foundResult) { 1052 if (foundResult) {
1082 // This is the second element we get. 1053 // This is the second element we get.
1054 try {
1055 throw IterableElementError.tooMany();
1056 } catch (e, s) {
1057 _cancelAndErrorWithReplacement(subscription, future, e, s);
1058 }
1059 return;
1060 }
1061 foundResult = true;
1062 result = value;
1063 },
1064 onError: future._completeError,
1065 onDone: () {
1066 if (foundResult) {
1067 future._complete(result);
1068 return;
1069 }
1083 try { 1070 try {
1084 throw IterableElementError.tooMany(); 1071 throw IterableElementError.noElement();
1085 } catch (e, s) { 1072 } catch (e, s) {
1086 _cancelAndErrorWithReplacement(subscription, future, e, s); 1073 _completeWithErrorCallback(future, e, s);
1087 } 1074 }
1088 return; 1075 },
1089 } 1076 cancelOnError: true);
1090 foundResult = true;
1091 result = value;
1092 },
1093 onError: future._completeError,
1094 onDone: () {
1095 if (foundResult) {
1096 future._complete(result);
1097 return;
1098 }
1099 try {
1100 throw IterableElementError.noElement();
1101 } catch (e, s) {
1102 _completeWithErrorCallback(future, e, s);
1103 }
1104 },
1105 cancelOnError: true);
1106 return future; 1077 return future;
1107 } 1078 }
1108 1079
1109 /** 1080 /**
1110 * Finds the first element of this stream matching [test]. 1081 * Finds the first element of this stream matching [test].
1111 * 1082 *
1112 * Returns a future that is filled with the first element of this stream 1083 * Returns a future that is filled with the first element of this stream
1113 * that [test] returns true for. 1084 * that [test] returns true for.
1114 * 1085 *
1115 * If no such element is found before this stream is done, and a 1086 * If no such element is found before this stream is done, and a
1116 * [defaultValue] function is provided, the result of calling [defaultValue] 1087 * [defaultValue] function is provided, the result of calling [defaultValue]
1117 * becomes the value of the future. 1088 * becomes the value of the future.
1118 * 1089 *
1119 * Stops listening to the stream after the first matching element has been 1090 * Stops listening to the stream after the first matching element has been
1120 * received. 1091 * received.
1121 * 1092 *
1122 * Internally the method cancels its subscription after the first element that 1093 * Internally the method cancels its subscription after the first element that
1123 * matches the predicate. This means that single-subscription (non-broadcast) 1094 * matches the predicate. This means that single-subscription (non-broadcast)
1124 * streams are closed and cannot be reused after a call to this method. 1095 * streams are closed and cannot be reused after a call to this method.
1125 * 1096 *
1126 * If an error occurs, or if this stream ends without finding a match and 1097 * If an error occurs, or if this stream ends without finding a match and
1127 * with no [defaultValue] function provided, the future will receive an 1098 * with no [defaultValue] function provided, the future will receive an
1128 * error. 1099 * error.
1129 */ 1100 */
1130 Future<dynamic> firstWhere(bool test(T element), {Object defaultValue()}) { 1101 Future<dynamic> firstWhere(bool test(T element), {Object defaultValue()}) {
1131 _Future<dynamic> future = new _Future(); 1102 _Future<dynamic> future = new _Future();
1132 StreamSubscription subscription; 1103 StreamSubscription subscription;
1133 subscription = this.listen( 1104 subscription = this.listen(
1134 (T value) { 1105 (T value) {
1135 _runUserCode( 1106 _runUserCode(() => test(value), (bool isMatch) {
1136 () => test(value),
1137 (bool isMatch) {
1138 if (isMatch) { 1107 if (isMatch) {
1139 _cancelAndValue(subscription, future, value); 1108 _cancelAndValue(subscription, future, value);
1140 } 1109 }
1141 }, 1110 }, _cancelAndErrorClosure(subscription, future));
1142 _cancelAndErrorClosure(subscription, future) 1111 },
1143 ); 1112 onError: future._completeError,
1144 }, 1113 onDone: () {
1145 onError: future._completeError, 1114 if (defaultValue != null) {
1146 onDone: () { 1115 _runUserCode(defaultValue, future._complete, future._completeError);
1147 if (defaultValue != null) { 1116 return;
1148 _runUserCode(defaultValue, future._complete, future._completeError); 1117 }
1149 return; 1118 try {
1150 } 1119 throw IterableElementError.noElement();
1151 try { 1120 } catch (e, s) {
1152 throw IterableElementError.noElement(); 1121 _completeWithErrorCallback(future, e, s);
1153 } catch (e, s) { 1122 }
1154 _completeWithErrorCallback(future, e, s); 1123 },
1155 } 1124 cancelOnError: true);
1156 },
1157 cancelOnError: true);
1158 return future; 1125 return future;
1159 } 1126 }
1160 1127
1161 /** 1128 /**
1162 * Finds the last element in this stream matching [test]. 1129 * Finds the last element in this stream matching [test].
1163 * 1130 *
1164 * As [firstWhere], except that the last matching element is found. 1131 * As [firstWhere], except that the last matching element is found.
1165 * That means that the result cannot be provided before this stream 1132 * That means that the result cannot be provided before this stream
1166 * is done. 1133 * is done.
1167 */ 1134 */
1168 Future<dynamic> lastWhere(bool test(T element), {Object defaultValue()}) { 1135 Future<dynamic> lastWhere(bool test(T element), {Object defaultValue()}) {
1169 _Future<dynamic> future = new _Future(); 1136 _Future<dynamic> future = new _Future();
1170 T result = null; 1137 T result = null;
1171 bool foundResult = false; 1138 bool foundResult = false;
1172 StreamSubscription subscription; 1139 StreamSubscription subscription;
1173 subscription = this.listen( 1140 subscription = this.listen(
1174 (T value) { 1141 (T value) {
1175 _runUserCode( 1142 _runUserCode(() => true == test(value), (bool isMatch) {
1176 () => true == test(value),
1177 (bool isMatch) {
1178 if (isMatch) { 1143 if (isMatch) {
1179 foundResult = true; 1144 foundResult = true;
1180 result = value; 1145 result = value;
1181 } 1146 }
1182 }, 1147 }, _cancelAndErrorClosure(subscription, future));
1183 _cancelAndErrorClosure(subscription, future) 1148 },
1184 ); 1149 onError: future._completeError,
1185 }, 1150 onDone: () {
1186 onError: future._completeError, 1151 if (foundResult) {
1187 onDone: () { 1152 future._complete(result);
1188 if (foundResult) { 1153 return;
1189 future._complete(result); 1154 }
1190 return; 1155 if (defaultValue != null) {
1191 } 1156 _runUserCode(defaultValue, future._complete, future._completeError);
1192 if (defaultValue != null) { 1157 return;
1193 _runUserCode(defaultValue, future._complete, future._completeError); 1158 }
1194 return; 1159 try {
1195 } 1160 throw IterableElementError.noElement();
1196 try { 1161 } catch (e, s) {
1197 throw IterableElementError.noElement(); 1162 _completeWithErrorCallback(future, e, s);
1198 } catch (e, s) { 1163 }
1199 _completeWithErrorCallback(future, e, s); 1164 },
1200 } 1165 cancelOnError: true);
1201 },
1202 cancelOnError: true);
1203 return future; 1166 return future;
1204 } 1167 }
1205 1168
1206 /** 1169 /**
1207 * Finds the single element in this stream matching [test]. 1170 * Finds the single element in this stream matching [test].
1208 * 1171 *
1209 * Like [lastMatch], except that it is an error if more than one 1172 * Like [lastMatch], except that it is an error if more than one
1210 * matching element occurs in the stream. 1173 * matching element occurs in the stream.
1211 */ 1174 */
1212 Future<T> singleWhere(bool test(T element)) { 1175 Future<T> singleWhere(bool test(T element)) {
1213 _Future<T> future = new _Future<T>(); 1176 _Future<T> future = new _Future<T>();
1214 T result = null; 1177 T result = null;
1215 bool foundResult = false; 1178 bool foundResult = false;
1216 StreamSubscription subscription; 1179 StreamSubscription subscription;
1217 subscription = this.listen( 1180 subscription = this.listen(
1218 (T value) { 1181 (T value) {
1219 _runUserCode( 1182 _runUserCode(() => true == test(value), (bool isMatch) {
1220 () => true == test(value),
1221 (bool isMatch) {
1222 if (isMatch) { 1183 if (isMatch) {
1223 if (foundResult) { 1184 if (foundResult) {
1224 try { 1185 try {
1225 throw IterableElementError.tooMany(); 1186 throw IterableElementError.tooMany();
1226 } catch (e, s) { 1187 } catch (e, s) {
1227 _cancelAndErrorWithReplacement(subscription, future, e, s); 1188 _cancelAndErrorWithReplacement(subscription, future, e, s);
1228 } 1189 }
1229 return; 1190 return;
1230 } 1191 }
1231 foundResult = true; 1192 foundResult = true;
1232 result = value; 1193 result = value;
1233 } 1194 }
1234 }, 1195 }, _cancelAndErrorClosure(subscription, future));
1235 _cancelAndErrorClosure(subscription, future) 1196 },
1236 ); 1197 onError: future._completeError,
1237 }, 1198 onDone: () {
1238 onError: future._completeError, 1199 if (foundResult) {
1239 onDone: () { 1200 future._complete(result);
1240 if (foundResult) { 1201 return;
1241 future._complete(result); 1202 }
1242 return; 1203 try {
1243 } 1204 throw IterableElementError.noElement();
1244 try { 1205 } catch (e, s) {
1245 throw IterableElementError.noElement(); 1206 _completeWithErrorCallback(future, e, s);
1246 } catch (e, s) { 1207 }
1247 _completeWithErrorCallback(future, e, s); 1208 },
1248 } 1209 cancelOnError: true);
1249 },
1250 cancelOnError: true);
1251 return future; 1210 return future;
1252 } 1211 }
1253 1212
1254 /** 1213 /**
1255 * Returns the value of the [index]th data event of this stream. 1214 * Returns the value of the [index]th data event of this stream.
1256 * 1215 *
1257 * Stops listening to the stream after the [index]th data event has been 1216 * Stops listening to the stream after the [index]th data event has been
1258 * received. 1217 * received.
1259 * 1218 *
1260 * Internally the method cancels its subscription after these elements. This 1219 * Internally the method cancels its subscription after these elements. This
1261 * means that single-subscription (non-broadcast) streams are closed and 1220 * means that single-subscription (non-broadcast) streams are closed and
1262 * cannot be reused after a call to this method. 1221 * cannot be reused after a call to this method.
1263 * 1222 *
1264 * If an error event occurs before the value is found, the future completes 1223 * If an error event occurs before the value is found, the future completes
1265 * with this error. 1224 * with this error.
1266 * 1225 *
1267 * If a done event occurs before the value is found, the future completes 1226 * If a done event occurs before the value is found, the future completes
1268 * with a [RangeError]. 1227 * with a [RangeError].
1269 */ 1228 */
1270 Future<T> elementAt(int index) { 1229 Future<T> elementAt(int index) {
1271 if (index is! int || index < 0) throw new ArgumentError(index); 1230 if (index is! int || index < 0) throw new ArgumentError(index);
1272 _Future<T> future = new _Future<T>(); 1231 _Future<T> future = new _Future<T>();
1273 StreamSubscription subscription; 1232 StreamSubscription subscription;
1274 int elementIndex = 0; 1233 int elementIndex = 0;
1275 subscription = this.listen( 1234 subscription = this.listen(
1276 (T value) { 1235 (T value) {
1277 if (index == elementIndex) { 1236 if (index == elementIndex) {
1278 _cancelAndValue(subscription, future, value); 1237 _cancelAndValue(subscription, future, value);
1279 return; 1238 return;
1280 } 1239 }
1281 elementIndex += 1; 1240 elementIndex += 1;
1282 }, 1241 },
1283 onError: future._completeError, 1242 onError: future._completeError,
1284 onDone: () { 1243 onDone: () {
1285 future._completeError( 1244 future._completeError(
1286 new RangeError.index(index, this, "index", null, elementIndex)); 1245 new RangeError.index(index, this, "index", null, elementIndex));
1287 }, 1246 },
1288 cancelOnError: true); 1247 cancelOnError: true);
1289 return future; 1248 return future;
1290 } 1249 }
1291 1250
1292 /** 1251 /**
1293 * Creates a new stream with the same events as this stream. 1252 * Creates a new stream with the same events as this stream.
1294 * 1253 *
1295 * Whenever more than [timeLimit] passes between two events from this stream, 1254 * Whenever more than [timeLimit] passes between two events from this stream,
1296 * the [onTimeout] function is called. 1255 * the [onTimeout] function is called.
1297 * 1256 *
1298 * The countdown doesn't start until the returned stream is listened to. 1257 * The countdown doesn't start until the returned stream is listened to.
(...skipping 18 matching lines...) Expand all
1317 StreamSubscription<T> subscription; 1276 StreamSubscription<T> subscription;
1318 Timer timer; 1277 Timer timer;
1319 Zone zone; 1278 Zone zone;
1320 _TimerCallback timeout; 1279 _TimerCallback timeout;
1321 1280
1322 void onData(T event) { 1281 void onData(T event) {
1323 timer.cancel(); 1282 timer.cancel();
1324 controller.add(event); 1283 controller.add(event);
1325 timer = zone.createTimer(timeLimit, timeout); 1284 timer = zone.createTimer(timeLimit, timeout);
1326 } 1285 }
1286
1327 void onError(error, StackTrace stackTrace) { 1287 void onError(error, StackTrace stackTrace) {
1328 timer.cancel(); 1288 timer.cancel();
1329 assert(controller is _StreamController || 1289 assert(controller is _StreamController ||
1330 controller is _BroadcastStreamController); 1290 controller is _BroadcastStreamController);
1331 dynamic eventSink = controller; 1291 dynamic eventSink = controller;
1332 eventSink._addError(error, stackTrace); // Avoid Zone error replacement. 1292 eventSink._addError(error, stackTrace); // Avoid Zone error replacement.
1333 timer = zone.createTimer(timeLimit, timeout); 1293 timer = zone.createTimer(timeLimit, timeout);
1334 } 1294 }
1295
1335 void onDone() { 1296 void onDone() {
1336 timer.cancel(); 1297 timer.cancel();
1337 controller.close(); 1298 controller.close();
1338 } 1299 }
1300
1339 void onListen() { 1301 void onListen() {
1340 // This is the onListen callback for of controller. 1302 // This is the onListen callback for of controller.
1341 // It runs in the same zone that the subscription was created in. 1303 // It runs in the same zone that the subscription was created in.
1342 // Use that zone for creating timers and running the onTimeout 1304 // Use that zone for creating timers and running the onTimeout
1343 // callback. 1305 // callback.
1344 zone = Zone.current; 1306 zone = Zone.current;
1345 if (onTimeout == null) { 1307 if (onTimeout == null) {
1346 timeout = () { 1308 timeout = () {
1347 controller.addError(new TimeoutException("No stream event", 1309 controller.addError(
1348 timeLimit), null); 1310 new TimeoutException("No stream event", timeLimit), null);
1349 }; 1311 };
1350 } else { 1312 } else {
1351 // TODO(floitsch): the return type should be 'void', and the type 1313 // TODO(floitsch): the return type should be 'void', and the type
1352 // should be inferred. 1314 // should be inferred.
1353 var registeredOnTimeout = 1315 var registeredOnTimeout =
1354 zone.registerUnaryCallback<dynamic, EventSink<T>>(onTimeout); 1316 zone.registerUnaryCallback<dynamic, EventSink<T>>(onTimeout);
1355 _ControllerEventSinkWrapper wrapper = 1317 _ControllerEventSinkWrapper wrapper =
1356 new _ControllerEventSinkWrapper(null); 1318 new _ControllerEventSinkWrapper(null);
1357 timeout = () { 1319 timeout = () {
1358 wrapper._sink = controller; // Only valid during call. 1320 wrapper._sink = controller; // Only valid during call.
1359 zone.runUnaryGuarded(registeredOnTimeout, wrapper); 1321 zone.runUnaryGuarded(registeredOnTimeout, wrapper);
1360 wrapper._sink = null; 1322 wrapper._sink = null;
1361 }; 1323 };
1362 } 1324 }
1363 1325
1364 subscription = this.listen(onData, onError: onError, onDone: onDone); 1326 subscription = this.listen(onData, onError: onError, onDone: onDone);
1365 timer = zone.createTimer(timeLimit, timeout); 1327 timer = zone.createTimer(timeLimit, timeout);
1366 } 1328 }
1329
1367 Future onCancel() { 1330 Future onCancel() {
1368 timer.cancel(); 1331 timer.cancel();
1369 Future result = subscription.cancel(); 1332 Future result = subscription.cancel();
1370 subscription = null; 1333 subscription = null;
1371 return result; 1334 return result;
1372 } 1335 }
1336
1373 controller = isBroadcast 1337 controller = isBroadcast
1374 ? new _SyncBroadcastStreamController<T>(onListen, onCancel) 1338 ? new _SyncBroadcastStreamController<T>(onListen, onCancel)
1375 : new _SyncStreamController<T>( 1339 : new _SyncStreamController<T>(onListen, () {
1376 onListen, 1340 // Don't null the timer, onCancel may call cancel again.
1377 () { 1341 timer.cancel();
1378 // Don't null the timer, onCancel may call cancel again. 1342 subscription.pause();
1379 timer.cancel(); 1343 }, () {
1380 subscription.pause(); 1344 subscription.resume();
1381 }, 1345 timer = zone.createTimer(timeLimit, timeout);
1382 () { 1346 }, onCancel);
1383 subscription.resume();
1384 timer = zone.createTimer(timeLimit, timeout);
1385 },
1386 onCancel);
1387 return controller.stream; 1347 return controller.stream;
1388 } 1348 }
1389 } 1349 }
1390 1350
1391 /** 1351 /**
1392 * A subscription on events from a [Stream]. 1352 * A subscription on events from a [Stream].
1393 * 1353 *
1394 * When you listen on a [Stream] using [Stream.listen], 1354 * When you listen on a [Stream] using [Stream.listen],
1395 * a [StreamSubscription] object is returned. 1355 * a [StreamSubscription] object is returned.
1396 * 1356 *
(...skipping 97 matching lines...) Expand 10 before | Expand all | Expand 10 after
1494 * 1454 *
1495 * In case of an error the subscription will automatically cancel (even 1455 * In case of an error the subscription will automatically cancel (even
1496 * when it was listening with `cancelOnError` set to `false`). 1456 * when it was listening with `cancelOnError` set to `false`).
1497 * 1457 *
1498 * In case of a `done` event the future completes with the given 1458 * In case of a `done` event the future completes with the given
1499 * [futureValue]. 1459 * [futureValue].
1500 */ 1460 */
1501 Future<E> asFuture<E>([E futureValue]); 1461 Future<E> asFuture<E>([E futureValue]);
1502 } 1462 }
1503 1463
1504
1505 /** 1464 /**
1506 * An interface that abstracts creation or handling of [Stream] events. 1465 * An interface that abstracts creation or handling of [Stream] events.
1507 */ 1466 */
1508 abstract class EventSink<T> implements Sink<T> { 1467 abstract class EventSink<T> implements Sink<T> {
1509 /** Send a data event to a stream. */ 1468 /** Send a data event to a stream. */
1510 void add(T event); 1469 void add(T event);
1511 1470
1512 /** Send an async error to a stream. */ 1471 /** Send an async error to a stream. */
1513 void addError(errorEvent, [StackTrace stackTrace]); 1472 void addError(errorEvent, [StackTrace stackTrace]);
1514 1473
1515 /** Close the sink. No further events can be added after closing. */ 1474 /** Close the sink. No further events can be added after closing. */
1516 void close(); 1475 void close();
1517 } 1476 }
1518 1477
1519
1520 /** [Stream] wrapper that only exposes the [Stream] interface. */ 1478 /** [Stream] wrapper that only exposes the [Stream] interface. */
1521 class StreamView<T> extends Stream<T> { 1479 class StreamView<T> extends Stream<T> {
1522 final Stream<T> _stream; 1480 final Stream<T> _stream;
1523 1481
1524 const StreamView(Stream<T> stream) : _stream = stream, super._internal(); 1482 const StreamView(Stream<T> stream)
1483 : _stream = stream,
1484 super._internal();
1525 1485
1526 bool get isBroadcast => _stream.isBroadcast; 1486 bool get isBroadcast => _stream.isBroadcast;
1527 1487
1528 Stream<T> asBroadcastStream( 1488 Stream<T> asBroadcastStream(
1529 {void onListen(StreamSubscription<T> subscription), 1489 {void onListen(StreamSubscription<T> subscription),
1530 void onCancel(StreamSubscription<T> subscription)}) 1490 void onCancel(StreamSubscription<T> subscription)}) =>
1531 => _stream.asBroadcastStream(onListen: onListen, onCancel: onCancel); 1491 _stream.asBroadcastStream(onListen: onListen, onCancel: onCancel);
1532 1492
1533 StreamSubscription<T> listen(void onData(T value), 1493 StreamSubscription<T> listen(void onData(T value),
1534 { Function onError, 1494 {Function onError, void onDone(), bool cancelOnError}) {
1535 void onDone(), 1495 return _stream.listen(onData,
1536 bool cancelOnError }) { 1496 onError: onError, onDone: onDone, cancelOnError: cancelOnError);
1537 return _stream.listen(onData, onError: onError, onDone: onDone,
1538 cancelOnError: cancelOnError);
1539 } 1497 }
1540 } 1498 }
1541 1499
1542
1543 /** 1500 /**
1544 * Abstract interface for a "sink" accepting multiple entire streams. 1501 * Abstract interface for a "sink" accepting multiple entire streams.
1545 * 1502 *
1546 * A consumer can accept a number of consecutive streams using [addStream], 1503 * A consumer can accept a number of consecutive streams using [addStream],
1547 * and when no further data need to be added, the [close] method tells the 1504 * and when no further data need to be added, the [close] method tells the
1548 * consumer to complete its work and shut down. 1505 * consumer to complete its work and shut down.
1549 * 1506 *
1550 * This class is not just a [Sink<Stream>] because it is also combined with 1507 * This class is not just a [Sink<Stream>] because it is also combined with
1551 * other [Sink] classes, like it's combined with [EventSink] in the 1508 * other [Sink] classes, like it's combined with [EventSink] in the
1552 * [StreamSink] class. 1509 * [StreamSink] class.
(...skipping 29 matching lines...) Expand all
1582 * This allows the consumer to complete any remaining work and release 1539 * This allows the consumer to complete any remaining work and release
1583 * resources that are no longer needed 1540 * resources that are no longer needed
1584 * 1541 *
1585 * Returns a future which is completed when the consumer has shut down. 1542 * Returns a future which is completed when the consumer has shut down.
1586 * If cleaning up can fail, the error may be reported in the returned future, 1543 * If cleaning up can fail, the error may be reported in the returned future,
1587 * otherwise it completes with `null`. 1544 * otherwise it completes with `null`.
1588 */ 1545 */
1589 Future close(); 1546 Future close();
1590 } 1547 }
1591 1548
1592
1593 /** 1549 /**
1594 * A object that accepts stream events both synchronously and asynchronously. 1550 * A object that accepts stream events both synchronously and asynchronously.
1595 * 1551 *
1596 * A [StreamSink] unifies the asynchronous methods from [StreamConsumer] and 1552 * A [StreamSink] unifies the asynchronous methods from [StreamConsumer] and
1597 * the synchronous methods from [EventSink]. 1553 * the synchronous methods from [EventSink].
1598 * 1554 *
1599 * The [EventSink] methods can't be used while the [addStream] is called. 1555 * The [EventSink] methods can't be used while the [addStream] is called.
1600 * As soon as the [addStream]'s [Future] completes with a value, the 1556 * As soon as the [addStream]'s [Future] completes with a value, the
1601 * [EventSink] methods can be used again. 1557 * [EventSink] methods can be used again.
1602 * 1558 *
(...skipping 38 matching lines...) Expand 10 before | Expand all | Expand 10 after
1641 * 1597 *
1642 * Otherwise, the returned future will complete when either: 1598 * Otherwise, the returned future will complete when either:
1643 * 1599 *
1644 * * all events have been processed and the sink has been closed, or 1600 * * all events have been processed and the sink has been closed, or
1645 * * the sink has otherwise been stopped from handling more events 1601 * * the sink has otherwise been stopped from handling more events
1646 * (for example by cancelling a stream subscription). 1602 * (for example by cancelling a stream subscription).
1647 */ 1603 */
1648 Future get done; 1604 Future get done;
1649 } 1605 }
1650 1606
1651
1652 /** 1607 /**
1653 * The target of a [Stream.transform] call. 1608 * The target of a [Stream.transform] call.
1654 * 1609 *
1655 * The [Stream.transform] call will pass itself to this object and then return 1610 * The [Stream.transform] call will pass itself to this object and then return
1656 * the resulting stream. 1611 * the resulting stream.
1657 * 1612 *
1658 * It is good practice to write transformers that can be used multiple times. 1613 * It is good practice to write transformers that can be used multiple times.
1659 */ 1614 */
1660 abstract class StreamTransformer<S, T> { 1615 abstract class StreamTransformer<S, T> {
1661 /** 1616 /**
(...skipping 53 matching lines...) Expand 10 before | Expand all | Expand 10 after
1715 * cancelOnError: cancelOnError); 1670 * cancelOnError: cancelOnError);
1716 * }, 1671 * },
1717 * onPause: () { subscription.pause(); }, 1672 * onPause: () { subscription.pause(); },
1718 * onResume: () { subscription.resume(); }, 1673 * onResume: () { subscription.resume(); },
1719 * onCancel: () => subscription.cancel(), 1674 * onCancel: () => subscription.cancel(),
1720 * sync: true); 1675 * sync: true);
1721 * return controller.stream.listen(null); 1676 * return controller.stream.listen(null);
1722 * }); 1677 * });
1723 */ 1678 */
1724 const factory StreamTransformer( 1679 const factory StreamTransformer(
1725 StreamSubscription<T> transformer(Stream<S> stream, bool cancelOnError)) 1680 StreamSubscription<T> transformer(
1726 = _StreamSubscriptionTransformer<S, T>; 1681 Stream<S> stream, bool cancelOnError)) =
1682 _StreamSubscriptionTransformer<S, T>;
1727 1683
1728 /** 1684 /**
1729 * Creates a [StreamTransformer] that delegates events to the given functions. 1685 * Creates a [StreamTransformer] that delegates events to the given functions.
1730 * 1686 *
1731 * Example use of a duplicating transformer: 1687 * Example use of a duplicating transformer:
1732 * 1688 *
1733 * stringStream.transform(new StreamTransformer<String, String>.fromHandle rs( 1689 * stringStream.transform(new StreamTransformer<String, String>.fromHandle rs(
1734 * handleData: (String value, EventSink<String> sink) { 1690 * handleData: (String value, EventSink<String> sink) {
1735 * sink.add(value); 1691 * sink.add(value);
1736 * sink.add(value); // Duplicate the incoming events. 1692 * sink.add(value); // Duplicate the incoming events.
1737 * })); 1693 * }));
1738 */ 1694 */
1739 factory StreamTransformer.fromHandlers({ 1695 factory StreamTransformer.fromHandlers(
1740 void handleData(S data, EventSink<T> sink), 1696 {void handleData(S data, EventSink<T> sink),
1741 void handleError(Object error, StackTrace stackTrace, EventSink<T> sink), 1697 void handleError(Object error, StackTrace stackTrace, EventSink<T> sink),
1742 void handleDone(EventSink<T> sink)}) 1698 void handleDone(EventSink<T> sink)}) = _StreamHandlerTransformer<S, T>;
1743 = _StreamHandlerTransformer<S, T>;
1744 1699
1745 /** 1700 /**
1746 * Transform the incoming [stream]'s events. 1701 * Transform the incoming [stream]'s events.
1747 * 1702 *
1748 * Creates a new stream. 1703 * Creates a new stream.
1749 * When this stream is listened to, it will start listening on [stream], 1704 * When this stream is listened to, it will start listening on [stream],
1750 * and generate events on the new stream based on the events from [stream]. 1705 * and generate events on the new stream based on the events from [stream].
1751 * 1706 *
1752 * Subscriptions on the returned stream should propagate pause state 1707 * Subscriptions on the returned stream should propagate pause state
1753 * to the subscription on [stream]. 1708 * to the subscription on [stream].
1754 */ 1709 */
1755 Stream<T> bind(Stream<S> stream); 1710 Stream<T> bind(Stream<S> stream);
1756 } 1711 }
1757 1712
1758 /** 1713 /**
1759 * An [Iterator] like interface for the values of a [Stream]. 1714 * An [Iterator] like interface for the values of a [Stream].
1760 * 1715 *
1761 * This wraps a [Stream] and a subscription on the stream. It listens 1716 * This wraps a [Stream] and a subscription on the stream. It listens
1762 * on the stream, and completes the future returned by [moveNext] when the 1717 * on the stream, and completes the future returned by [moveNext] when the
1763 * next value becomes available. 1718 * next value becomes available.
1764 * 1719 *
1765 * The stream may be paused between calls to [moveNext]. 1720 * The stream may be paused between calls to [moveNext].
1766 */ 1721 */
1767 abstract class StreamIterator<T> { 1722 abstract class StreamIterator<T> {
1768
1769 /** Create a [StreamIterator] on [stream]. */ 1723 /** Create a [StreamIterator] on [stream]. */
1770 factory StreamIterator(Stream<T> stream) 1724 factory StreamIterator(Stream<T> stream)
1771 // TODO(lrn): use redirecting factory constructor when type 1725 // TODO(lrn): use redirecting factory constructor when type
1772 // arguments are supported. 1726 // arguments are supported.
1773 => new _StreamIterator<T>(stream); 1727 =>
1728 new _StreamIterator<T>(stream);
1774 1729
1775 /** 1730 /**
1776 * Wait for the next stream value to be available. 1731 * Wait for the next stream value to be available.
1777 * 1732 *
1778 * Returns a future which will complete with either `true` or `false`. 1733 * Returns a future which will complete with either `true` or `false`.
1779 * Completing with `true` means that another event has been received and 1734 * Completing with `true` means that another event has been received and
1780 * can be read as [current]. 1735 * can be read as [current].
1781 * Completing with `false` means that the stream iteration is done and 1736 * Completing with `false` means that the stream iteration is done and
1782 * no further events will ever be available. 1737 * no further events will ever be available.
1783 * The future may complete with an error, if the stream produces an error, 1738 * The future may complete with an error, if the stream produces an error,
(...skipping 31 matching lines...) Expand 10 before | Expand all | Expand 10 after
1815 * If [moveNext] has been called when the iterator is canceled, 1770 * If [moveNext] has been called when the iterator is canceled,
1816 * its returned future will complete with `false` as value, 1771 * its returned future will complete with `false` as value,
1817 * as will all further calls to [moveNext]. 1772 * as will all further calls to [moveNext].
1818 * 1773 *
1819 * Returns a future if the cancel-operation is not completed synchronously. 1774 * Returns a future if the cancel-operation is not completed synchronously.
1820 * Otherwise returns `null`. 1775 * Otherwise returns `null`.
1821 */ 1776 */
1822 Future cancel(); 1777 Future cancel();
1823 } 1778 }
1824 1779
1825
1826 /** 1780 /**
1827 * Wraps an [_EventSink] so it exposes only the [EventSink] interface. 1781 * Wraps an [_EventSink] so it exposes only the [EventSink] interface.
1828 */ 1782 */
1829 class _ControllerEventSinkWrapper<T> implements EventSink<T> { 1783 class _ControllerEventSinkWrapper<T> implements EventSink<T> {
1830 EventSink _sink; 1784 EventSink _sink;
1831 _ControllerEventSinkWrapper(this._sink); 1785 _ControllerEventSinkWrapper(this._sink);
1832 1786
1833 void add(T data) { _sink.add(data); } 1787 void add(T data) {
1788 _sink.add(data);
1789 }
1790
1834 void addError(error, [StackTrace stackTrace]) { 1791 void addError(error, [StackTrace stackTrace]) {
1835 _sink.addError(error, stackTrace); 1792 _sink.addError(error, stackTrace);
1836 } 1793 }
1837 void close() { _sink.close(); } 1794
1795 void close() {
1796 _sink.close();
1797 }
1838 } 1798 }
OLDNEW
« no previous file with comments | « sdk/lib/async/schedule_microtask.dart ('k') | sdk/lib/async/stream_controller.dart » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698