OLD | NEW |
1 // Copyright (c) 2013, the Dart project authors. Please see the AUTHORS file | 1 // Copyright (c) 2013, the Dart project authors. Please see the AUTHORS file |
2 // for details. All rights reserved. Use of this source code is governed by a | 2 // for details. All rights reserved. Use of this source code is governed by a |
3 // BSD-style license that can be found in the LICENSE file. | 3 // BSD-style license that can be found in the LICENSE file. |
4 | 4 |
5 part of dart.async; | 5 part of dart.async; |
6 | 6 |
7 // ------------------------------------------------------------------- | 7 // ------------------------------------------------------------------- |
8 // Core Stream types | 8 // Core Stream types |
9 // ------------------------------------------------------------------- | 9 // ------------------------------------------------------------------- |
10 | 10 |
(...skipping 86 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
97 * | 97 * |
98 * When the future completes, the stream will fire one event, either | 98 * When the future completes, the stream will fire one event, either |
99 * data or error, and then close with a done-event. | 99 * data or error, and then close with a done-event. |
100 */ | 100 */ |
101 factory Stream.fromFuture(Future<T> future) { | 101 factory Stream.fromFuture(Future<T> future) { |
102 // Use the controller's buffering to fill in the value even before | 102 // Use the controller's buffering to fill in the value even before |
103 // the stream has a listener. For a single value, it's not worth it | 103 // the stream has a listener. For a single value, it's not worth it |
104 // to wait for a listener before doing the `then` on the future. | 104 // to wait for a listener before doing the `then` on the future. |
105 _StreamController<T> controller = new StreamController<T>(sync: true); | 105 _StreamController<T> controller = new StreamController<T>(sync: true); |
106 future.then((value) { | 106 future.then((value) { |
107 controller._add(value); | 107 controller._add(value); |
108 controller._closeUnchecked(); | 108 controller._closeUnchecked(); |
109 }, | 109 }, onError: (error, stackTrace) { |
110 onError: (error, stackTrace) { | 110 controller._addError(error, stackTrace); |
111 controller._addError(error, stackTrace); | 111 controller._closeUnchecked(); |
112 controller._closeUnchecked(); | 112 }); |
113 }); | |
114 return controller.stream; | 113 return controller.stream; |
115 } | 114 } |
116 | 115 |
117 /** | 116 /** |
118 * Create a stream from a group of futures. | 117 * Create a stream from a group of futures. |
119 * | 118 * |
120 * The stream reports the results of the futures on the stream in the order | 119 * The stream reports the results of the futures on the stream in the order |
121 * in which the futures complete. | 120 * in which the futures complete. |
122 * | 121 * |
123 * If some futures have completed before calling `Stream.fromFutures`, | 122 * If some futures have completed before calling `Stream.fromFutures`, |
(...skipping 50 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
174 /** | 173 /** |
175 * Creates a stream that repeatedly emits events at [period] intervals. | 174 * Creates a stream that repeatedly emits events at [period] intervals. |
176 * | 175 * |
177 * The event values are computed by invoking [computation]. The argument to | 176 * The event values are computed by invoking [computation]. The argument to |
178 * this callback is an integer that starts with 0 and is incremented for | 177 * this callback is an integer that starts with 0 and is incremented for |
179 * every event. | 178 * every event. |
180 * | 179 * |
181 * If [computation] is omitted the event values will all be `null`. | 180 * If [computation] is omitted the event values will all be `null`. |
182 */ | 181 */ |
183 factory Stream.periodic(Duration period, | 182 factory Stream.periodic(Duration period, |
184 [T computation(int computationCount)]) { | 183 [T computation(int computationCount)]) { |
185 Timer timer; | 184 Timer timer; |
186 int computationCount = 0; | 185 int computationCount = 0; |
187 StreamController<T> controller; | 186 StreamController<T> controller; |
188 // Counts the time that the Stream was running (and not paused). | 187 // Counts the time that the Stream was running (and not paused). |
189 Stopwatch watch = new Stopwatch(); | 188 Stopwatch watch = new Stopwatch(); |
190 | 189 |
191 void sendEvent() { | 190 void sendEvent() { |
192 watch.reset(); | 191 watch.reset(); |
193 T data; | 192 T data; |
194 if (computation != null) { | 193 if (computation != null) { |
195 try { | 194 try { |
196 data = computation(computationCount++); | 195 data = computation(computationCount++); |
197 } catch (e, s) { | 196 } catch (e, s) { |
198 controller.addError(e, s); | 197 controller.addError(e, s); |
199 return; | 198 return; |
200 } | 199 } |
201 } | 200 } |
202 controller.add(data); | 201 controller.add(data); |
203 } | 202 } |
204 | 203 |
205 void startPeriodicTimer() { | 204 void startPeriodicTimer() { |
206 assert(timer == null); | 205 assert(timer == null); |
207 timer = new Timer.periodic(period, (Timer timer) { | 206 timer = new Timer.periodic(period, (Timer timer) { |
208 sendEvent(); | 207 sendEvent(); |
209 }); | 208 }); |
210 } | 209 } |
211 | 210 |
212 controller = new StreamController<T>(sync: true, | 211 controller = new StreamController<T>( |
| 212 sync: true, |
213 onListen: () { | 213 onListen: () { |
214 watch.start(); | 214 watch.start(); |
215 startPeriodicTimer(); | 215 startPeriodicTimer(); |
216 }, | 216 }, |
217 onPause: () { | 217 onPause: () { |
218 timer.cancel(); | 218 timer.cancel(); |
219 timer = null; | 219 timer = null; |
220 watch.stop(); | 220 watch.stop(); |
221 }, | 221 }, |
222 onResume: () { | 222 onResume: () { |
(...skipping 47 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
270 * // Some generic types ommitted for brevety. | 270 * // Some generic types ommitted for brevety. |
271 * Stream bind(Stream stream) => new Stream<String>.eventTransformed( | 271 * Stream bind(Stream stream) => new Stream<String>.eventTransformed( |
272 * stream, | 272 * stream, |
273 * (EventSink sink) => new DuplicationSink(sink)); | 273 * (EventSink sink) => new DuplicationSink(sink)); |
274 * } | 274 * } |
275 * | 275 * |
276 * stringStream.transform(new DuplicationTransformer()); | 276 * stringStream.transform(new DuplicationTransformer()); |
277 * | 277 * |
278 * The resulting stream is a broadcast stream if [source] is. | 278 * The resulting stream is a broadcast stream if [source] is. |
279 */ | 279 */ |
280 factory Stream.eventTransformed(Stream source, | 280 factory Stream.eventTransformed( |
281 EventSink mapSink(EventSink<T> sink)) { | 281 Stream source, EventSink mapSink(EventSink<T> sink)) { |
282 return new _BoundSinkStream(source, mapSink); | 282 return new _BoundSinkStream(source, mapSink); |
283 } | 283 } |
284 | 284 |
285 /** | 285 /** |
286 * Reports whether this stream is a broadcast stream. | 286 * Reports whether this stream is a broadcast stream. |
287 */ | 287 */ |
288 bool get isBroadcast => false; | 288 bool get isBroadcast => false; |
289 | 289 |
290 /** | 290 /** |
291 * Returns a multi-subscription stream that produces the same events as this. | 291 * Returns a multi-subscription stream that produces the same events as this. |
292 * | 292 * |
293 * The returned stream will subscribe to this stream when its first | 293 * The returned stream will subscribe to this stream when its first |
294 * subscriber is added, and will stay subscribed until this stream ends, | 294 * subscriber is added, and will stay subscribed until this stream ends, |
295 * or a callback cancels the subscription. | 295 * or a callback cancels the subscription. |
296 * | 296 * |
297 * If [onListen] is provided, it is called with a subscription-like object | 297 * If [onListen] is provided, it is called with a subscription-like object |
298 * that represents the underlying subscription to this stream. It is | 298 * that represents the underlying subscription to this stream. It is |
299 * possible to pause, resume or cancel the subscription during the call | 299 * possible to pause, resume or cancel the subscription during the call |
300 * to [onListen]. It is not possible to change the event handlers, including | 300 * to [onListen]. It is not possible to change the event handlers, including |
301 * using [StreamSubscription.asFuture]. | 301 * using [StreamSubscription.asFuture]. |
302 * | 302 * |
303 * If [onCancel] is provided, it is called in a similar way to [onListen] | 303 * If [onCancel] is provided, it is called in a similar way to [onListen] |
304 * when the returned stream stops having listener. If it later gets | 304 * when the returned stream stops having listener. If it later gets |
305 * a new listener, the [onListen] function is called again. | 305 * a new listener, the [onListen] function is called again. |
306 * | 306 * |
307 * Use the callbacks, for example, for pausing the underlying subscription | 307 * Use the callbacks, for example, for pausing the underlying subscription |
308 * while having no subscribers to prevent losing events, or canceling the | 308 * while having no subscribers to prevent losing events, or canceling the |
309 * subscription when there are no listeners. | 309 * subscription when there are no listeners. |
310 */ | 310 */ |
311 Stream<T> asBroadcastStream({ | 311 Stream<T> asBroadcastStream( |
312 void onListen(StreamSubscription<T> subscription), | 312 {void onListen(StreamSubscription<T> subscription), |
313 void onCancel(StreamSubscription<T> subscription) }) { | 313 void onCancel(StreamSubscription<T> subscription)}) { |
314 return new _AsBroadcastStream<T>(this, onListen, onCancel); | 314 return new _AsBroadcastStream<T>(this, onListen, onCancel); |
315 } | 315 } |
316 | 316 |
317 /** | 317 /** |
318 * Adds a subscription to this stream. | 318 * Adds a subscription to this stream. |
319 * | 319 * |
320 * Returns a [StreamSubscription] which handles events from the stream using | 320 * Returns a [StreamSubscription] which handles events from the stream using |
321 * the provided [onData], [onError] and [onDone] handlers. | 321 * the provided [onData], [onError] and [onDone] handlers. |
322 * The handlers can be changed on the subscription, but they start out | 322 * The handlers can be changed on the subscription, but they start out |
323 * as the provided functions. | 323 * as the provided functions. |
(...skipping 19 matching lines...) Expand all Loading... |
343 * called. If [onDone] is `null`, nothing happens. | 343 * called. If [onDone] is `null`, nothing happens. |
344 * | 344 * |
345 * If [cancelOnError] is true, the subscription is automatically cancelled | 345 * If [cancelOnError] is true, the subscription is automatically cancelled |
346 * when the first error event is delivered. The default is `false`. | 346 * when the first error event is delivered. The default is `false`. |
347 * | 347 * |
348 * While a subscription is paused, or when it has been cancelled, | 348 * While a subscription is paused, or when it has been cancelled, |
349 * the subscription doesn't receive events and none of the | 349 * the subscription doesn't receive events and none of the |
350 * event handler functions are called. | 350 * event handler functions are called. |
351 */ | 351 */ |
352 StreamSubscription<T> listen(void onData(T event), | 352 StreamSubscription<T> listen(void onData(T event), |
353 { Function onError, | 353 {Function onError, void onDone(), bool cancelOnError}); |
354 void onDone(), | |
355 bool cancelOnError}); | |
356 | 354 |
357 /** | 355 /** |
358 * Creates a new stream from this stream that discards some data events. | 356 * Creates a new stream from this stream that discards some data events. |
359 * | 357 * |
360 * The new stream sends the same error and done events as this stream, | 358 * The new stream sends the same error and done events as this stream, |
361 * but it only sends the data events that satisfy the [test]. | 359 * but it only sends the data events that satisfy the [test]. |
362 * | 360 * |
363 * The returned stream is a broadcast stream if this stream is. | 361 * The returned stream is a broadcast stream if this stream is. |
364 * If a broadcast stream is listened to more than once, each subscription | 362 * If a broadcast stream is listened to more than once, each subscription |
365 * will individually perform the `test`. | 363 * will individually perform the `test`. |
(...skipping 32 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
398 * | 396 * |
399 * The returned stream is a broadcast stream if this stream is. | 397 * The returned stream is a broadcast stream if this stream is. |
400 */ | 398 */ |
401 Stream<E> asyncMap<E>(convert(T event)) { | 399 Stream<E> asyncMap<E>(convert(T event)) { |
402 StreamController<E> controller; | 400 StreamController<E> controller; |
403 StreamSubscription<T> subscription; | 401 StreamSubscription<T> subscription; |
404 | 402 |
405 void onListen() { | 403 void onListen() { |
406 final add = controller.add; | 404 final add = controller.add; |
407 assert(controller is _StreamController || | 405 assert(controller is _StreamController || |
408 controller is _BroadcastStreamController); | 406 controller is _BroadcastStreamController); |
409 final _EventSink<E> eventSink = | 407 final _EventSink<E> eventSink = controller as Object/*=_EventSink<E>*/; |
410 controller as Object /*=_EventSink<E>*/; | |
411 final addError = eventSink._addError; | 408 final addError = eventSink._addError; |
412 subscription = this.listen( | 409 subscription = this.listen((T event) { |
413 (T event) { | 410 dynamic newValue; |
414 dynamic newValue; | 411 try { |
415 try { | 412 newValue = convert(event); |
416 newValue = convert(event); | 413 } catch (e, s) { |
417 } catch (e, s) { | 414 controller.addError(e, s); |
418 controller.addError(e, s); | 415 return; |
419 return; | 416 } |
420 } | 417 if (newValue is Future) { |
421 if (newValue is Future) { | 418 subscription.pause(); |
422 subscription.pause(); | 419 newValue |
423 newValue.then(add, onError: addError) | 420 .then(add, onError: addError) |
424 .whenComplete(subscription.resume); | 421 .whenComplete(subscription.resume); |
425 } else { | 422 } else { |
426 controller.add(newValue as Object/*=E*/); | 423 controller.add(newValue as Object/*=E*/); |
427 } | 424 } |
428 }, | 425 }, onError: addError, onDone: controller.close); |
429 onError: addError, | |
430 onDone: controller.close | |
431 ); | |
432 } | 426 } |
433 | 427 |
434 if (this.isBroadcast) { | 428 if (this.isBroadcast) { |
435 controller = new StreamController<E>.broadcast( | 429 controller = new StreamController<E>.broadcast( |
436 onListen: onListen, | 430 onListen: onListen, |
437 onCancel: () { subscription.cancel(); }, | 431 onCancel: () { |
438 sync: true | 432 subscription.cancel(); |
439 ); | 433 }, |
| 434 sync: true); |
440 } else { | 435 } else { |
441 controller = new StreamController<E>( | 436 controller = new StreamController<E>( |
442 onListen: onListen, | 437 onListen: onListen, |
443 onPause: () { subscription.pause(); }, | 438 onPause: () { |
444 onResume: () { subscription.resume(); }, | 439 subscription.pause(); |
445 onCancel: () => subscription.cancel(), | 440 }, |
446 sync: true | 441 onResume: () { |
447 ); | 442 subscription.resume(); |
| 443 }, |
| 444 onCancel: () => subscription.cancel(), |
| 445 sync: true); |
448 } | 446 } |
449 return controller.stream; | 447 return controller.stream; |
450 } | 448 } |
451 | 449 |
452 /** | 450 /** |
453 * Creates a new stream with the events of a stream per original event. | 451 * Creates a new stream with the events of a stream per original event. |
454 * | 452 * |
455 * This acts like [expand], except that [convert] returns a [Stream] | 453 * This acts like [expand], except that [convert] returns a [Stream] |
456 * instead of an [Iterable]. | 454 * instead of an [Iterable]. |
457 * The events of the returned stream becomes the events of the returned | 455 * The events of the returned stream becomes the events of the returned |
458 * stream, in the order they are produced. | 456 * stream, in the order they are produced. |
459 * | 457 * |
460 * If [convert] returns `null`, no value is put on the output stream, | 458 * If [convert] returns `null`, no value is put on the output stream, |
461 * just as if it returned an empty stream. | 459 * just as if it returned an empty stream. |
462 * | 460 * |
463 * The returned stream is a broadcast stream if this stream is. | 461 * The returned stream is a broadcast stream if this stream is. |
464 */ | 462 */ |
465 Stream<E> asyncExpand<E>(Stream<E> convert(T event)) { | 463 Stream<E> asyncExpand<E>(Stream<E> convert(T event)) { |
466 StreamController<E> controller; | 464 StreamController<E> controller; |
467 StreamSubscription<T> subscription; | 465 StreamSubscription<T> subscription; |
468 void onListen() { | 466 void onListen() { |
469 assert(controller is _StreamController || | 467 assert(controller is _StreamController || |
470 controller is _BroadcastStreamController); | 468 controller is _BroadcastStreamController); |
471 final _EventSink<E> eventSink = | 469 final _EventSink<E> eventSink = controller as Object/*=_EventSink<E>*/; |
472 controller as Object /*=_EventSink<E>*/; | 470 subscription = this.listen((T event) { |
473 subscription = this.listen( | 471 Stream<E> newStream; |
474 (T event) { | 472 try { |
475 Stream<E> newStream; | 473 newStream = convert(event); |
476 try { | 474 } catch (e, s) { |
477 newStream = convert(event); | 475 controller.addError(e, s); |
478 } catch (e, s) { | 476 return; |
479 controller.addError(e, s); | 477 } |
480 return; | 478 if (newStream != null) { |
481 } | 479 subscription.pause(); |
482 if (newStream != null) { | 480 controller.addStream(newStream).whenComplete(subscription.resume); |
483 subscription.pause(); | 481 } |
484 controller.addStream(newStream) | 482 }, |
485 .whenComplete(subscription.resume); | 483 onError: eventSink._addError, // Avoid Zone error replacement. |
486 } | 484 onDone: controller.close); |
487 }, | |
488 onError: eventSink._addError, // Avoid Zone error replacement. | |
489 onDone: controller.close | |
490 ); | |
491 } | 485 } |
| 486 |
492 if (this.isBroadcast) { | 487 if (this.isBroadcast) { |
493 controller = new StreamController<E>.broadcast( | 488 controller = new StreamController<E>.broadcast( |
494 onListen: onListen, | 489 onListen: onListen, |
495 onCancel: () { subscription.cancel(); }, | 490 onCancel: () { |
496 sync: true | 491 subscription.cancel(); |
497 ); | 492 }, |
| 493 sync: true); |
498 } else { | 494 } else { |
499 controller = new StreamController<E>( | 495 controller = new StreamController<E>( |
500 onListen: onListen, | 496 onListen: onListen, |
501 onPause: () { subscription.pause(); }, | 497 onPause: () { |
502 onResume: () { subscription.resume(); }, | 498 subscription.pause(); |
503 onCancel: () => subscription.cancel(), | 499 }, |
504 sync: true | 500 onResume: () { |
505 ); | 501 subscription.resume(); |
| 502 }, |
| 503 onCancel: () => subscription.cancel(), |
| 504 sync: true); |
506 } | 505 } |
507 return controller.stream; | 506 return controller.stream; |
508 } | 507 } |
509 | 508 |
510 /** | 509 /** |
511 * Creates a wrapper Stream that intercepts some errors from this stream. | 510 * Creates a wrapper Stream that intercepts some errors from this stream. |
512 * | 511 * |
513 * If this stream sends an error that matches [test], then it is intercepted | 512 * If this stream sends an error that matches [test], then it is intercepted |
514 * by the [onError] function. | 513 * by the [onError] function. |
515 * | 514 * |
(...skipping 12 matching lines...) Expand all Loading... |
528 * or simply return to make the stream forget the error. | 527 * or simply return to make the stream forget the error. |
529 * | 528 * |
530 * If you need to transform an error into a data event, use the more generic | 529 * If you need to transform an error into a data event, use the more generic |
531 * [Stream.transform] to handle the event by writing a data event to | 530 * [Stream.transform] to handle the event by writing a data event to |
532 * the output sink. | 531 * the output sink. |
533 * | 532 * |
534 * The returned stream is a broadcast stream if this stream is. | 533 * The returned stream is a broadcast stream if this stream is. |
535 * If a broadcast stream is listened to more than once, each subscription | 534 * If a broadcast stream is listened to more than once, each subscription |
536 * will individually perform the `test` and handle the error. | 535 * will individually perform the `test` and handle the error. |
537 */ | 536 */ |
538 Stream<T> handleError(Function onError, { bool test(error) }) { | 537 Stream<T> handleError(Function onError, {bool test(error)}) { |
539 return new _HandleErrorStream<T>(this, onError, test); | 538 return new _HandleErrorStream<T>(this, onError, test); |
540 } | 539 } |
541 | 540 |
542 /** | 541 /** |
543 * Creates a new stream from this stream that converts each element | 542 * Creates a new stream from this stream that converts each element |
544 * into zero or more events. | 543 * into zero or more events. |
545 * | 544 * |
546 * Each incoming event is converted to an [Iterable] of new events, | 545 * Each incoming event is converted to an [Iterable] of new events, |
547 * and each of these new events are then sent by the returned stream | 546 * and each of these new events are then sent by the returned stream |
548 * in order. | 547 * in order. |
(...skipping 29 matching lines...) Expand all Loading... |
578 } | 577 } |
579 | 578 |
580 /** | 579 /** |
581 * Chains this stream as the input of the provided [StreamTransformer]. | 580 * Chains this stream as the input of the provided [StreamTransformer]. |
582 * | 581 * |
583 * Returns the result of [:streamTransformer.bind:] itself. | 582 * Returns the result of [:streamTransformer.bind:] itself. |
584 * | 583 * |
585 * The `streamTransformer` can decide whether it wants to return a | 584 * The `streamTransformer` can decide whether it wants to return a |
586 * broadcast stream or not. | 585 * broadcast stream or not. |
587 */ | 586 */ |
588 Stream<S> transform<S>( | 587 Stream<S> transform<S>(StreamTransformer<T, S> streamTransformer) { |
589 StreamTransformer<T, S > streamTransformer) { | |
590 return streamTransformer.bind(this); | 588 return streamTransformer.bind(this); |
591 } | 589 } |
592 | 590 |
593 /** | 591 /** |
594 * Reduces a sequence of values by repeatedly applying [combine]. | 592 * Reduces a sequence of values by repeatedly applying [combine]. |
595 */ | 593 */ |
596 Future<T> reduce(T combine(T previous, T element)) { | 594 Future<T> reduce(T combine(T previous, T element)) { |
597 _Future<T> result = new _Future<T>(); | 595 _Future<T> result = new _Future<T>(); |
598 bool seenFirst = false; | 596 bool seenFirst = false; |
599 T value; | 597 T value; |
600 StreamSubscription subscription; | 598 StreamSubscription subscription; |
601 subscription = this.listen( | 599 subscription = this.listen( |
602 (T element) { | 600 (T element) { |
603 if (seenFirst) { | 601 if (seenFirst) { |
604 _runUserCode(() => combine(value, element), | 602 _runUserCode(() => combine(value, element), (T newValue) { |
605 (T newValue) { value = newValue; }, | 603 value = newValue; |
606 _cancelAndErrorClosure(subscription, result)); | 604 }, _cancelAndErrorClosure(subscription, result)); |
607 } else { | 605 } else { |
608 value = element; | 606 value = element; |
609 seenFirst = true; | 607 seenFirst = true; |
610 } | |
611 }, | |
612 onError: result._completeError, | |
613 onDone: () { | |
614 if (!seenFirst) { | |
615 try { | |
616 throw IterableElementError.noElement(); | |
617 } catch (e, s) { | |
618 _completeWithErrorCallback(result, e, s); | |
619 } | 608 } |
620 } else { | 609 }, |
621 result._complete(value); | 610 onError: result._completeError, |
622 } | 611 onDone: () { |
623 }, | 612 if (!seenFirst) { |
624 cancelOnError: true | 613 try { |
625 ); | 614 throw IterableElementError.noElement(); |
| 615 } catch (e, s) { |
| 616 _completeWithErrorCallback(result, e, s); |
| 617 } |
| 618 } else { |
| 619 result._complete(value); |
| 620 } |
| 621 }, |
| 622 cancelOnError: true); |
626 return result; | 623 return result; |
627 } | 624 } |
628 | 625 |
629 /** Reduces a sequence of values by repeatedly applying [combine]. */ | 626 /** Reduces a sequence of values by repeatedly applying [combine]. */ |
630 Future<S> fold<S>(S initialValue, | 627 Future<S> fold<S>(S initialValue, S combine(S previous, T element)) { |
631 S combine(S previous, T element)) { | |
632 | |
633 _Future<S> result = new _Future<S>(); | 628 _Future<S> result = new _Future<S>(); |
634 S value = initialValue; | 629 S value = initialValue; |
635 StreamSubscription subscription; | 630 StreamSubscription subscription; |
636 subscription = this.listen( | 631 subscription = this.listen((T element) { |
637 (T element) { | 632 _runUserCode(() => combine(value, element), (S newValue) { |
638 _runUserCode( | 633 value = newValue; |
639 () => combine(value, element), | 634 }, _cancelAndErrorClosure(subscription, result)); |
640 (S newValue) { value = newValue; }, | 635 }, onError: (e, st) { |
641 _cancelAndErrorClosure(subscription, result) | 636 result._completeError(e, st); |
642 ); | 637 }, onDone: () { |
643 }, | 638 result._complete(value); |
644 onError: (e, st) { | 639 }, cancelOnError: true); |
645 result._completeError(e, st); | |
646 }, | |
647 onDone: () { | |
648 result._complete(value); | |
649 }, | |
650 cancelOnError: true); | |
651 return result; | 640 return result; |
652 } | 641 } |
653 | 642 |
654 /** | 643 /** |
655 * Collects string of data events' string representations. | 644 * Collects string of data events' string representations. |
656 * | 645 * |
657 * If [separator] is provided, it is inserted between any two | 646 * If [separator] is provided, it is inserted between any two |
658 * elements. | 647 * elements. |
659 * | 648 * |
660 * Any error in the stream causes the future to complete with that | 649 * Any error in the stream causes the future to complete with that |
661 * error. Otherwise it completes with the collected string when | 650 * error. Otherwise it completes with the collected string when |
662 * the "done" event arrives. | 651 * the "done" event arrives. |
663 */ | 652 */ |
664 Future<String> join([String separator = ""]) { | 653 Future<String> join([String separator = ""]) { |
665 _Future<String> result = new _Future<String>(); | 654 _Future<String> result = new _Future<String>(); |
666 StringBuffer buffer = new StringBuffer(); | 655 StringBuffer buffer = new StringBuffer(); |
667 StreamSubscription subscription; | 656 StreamSubscription subscription; |
668 bool first = true; | 657 bool first = true; |
669 subscription = this.listen( | 658 subscription = this.listen((T element) { |
670 (T element) { | 659 if (!first) { |
671 if (!first) { | 660 buffer.write(separator); |
672 buffer.write(separator); | 661 } |
673 } | 662 first = false; |
674 first = false; | 663 try { |
675 try { | 664 buffer.write(element); |
676 buffer.write(element); | 665 } catch (e, s) { |
677 } catch (e, s) { | 666 _cancelAndErrorWithReplacement(subscription, result, e, s); |
678 _cancelAndErrorWithReplacement(subscription, result, e, s); | 667 } |
679 } | 668 }, onError: (e) { |
680 }, | 669 result._completeError(e); |
681 onError: (e) { | 670 }, onDone: () { |
682 result._completeError(e); | 671 result._complete(buffer.toString()); |
683 }, | 672 }, cancelOnError: true); |
684 onDone: () { | |
685 result._complete(buffer.toString()); | |
686 }, | |
687 cancelOnError: true); | |
688 return result; | 673 return result; |
689 } | 674 } |
690 | 675 |
691 /** | 676 /** |
692 * Checks whether [needle] occurs in the elements provided by this stream. | 677 * Checks whether [needle] occurs in the elements provided by this stream. |
693 * | 678 * |
694 * Completes the [Future] when the answer is known. | 679 * Completes the [Future] when the answer is known. |
695 * If this stream reports an error, the [Future] will report that error. | 680 * If this stream reports an error, the [Future] will report that error. |
696 */ | 681 */ |
697 Future<bool> contains(Object needle) { | 682 Future<bool> contains(Object needle) { |
698 _Future<bool> future = new _Future<bool>(); | 683 _Future<bool> future = new _Future<bool>(); |
699 StreamSubscription subscription; | 684 StreamSubscription subscription; |
700 subscription = this.listen( | 685 subscription = this.listen( |
701 (T element) { | 686 (T element) { |
702 _runUserCode( | 687 _runUserCode(() => (element == needle), (bool isMatch) { |
703 () => (element == needle), | 688 if (isMatch) { |
704 (bool isMatch) { | 689 _cancelAndValue(subscription, future, true); |
705 if (isMatch) { | 690 } |
706 _cancelAndValue(subscription, future, true); | 691 }, _cancelAndErrorClosure(subscription, future)); |
707 } | |
708 }, | |
709 _cancelAndErrorClosure(subscription, future) | |
710 ); | |
711 }, | 692 }, |
712 onError: future._completeError, | 693 onError: future._completeError, |
713 onDone: () { | 694 onDone: () { |
714 future._complete(false); | 695 future._complete(false); |
715 }, | 696 }, |
716 cancelOnError: true); | 697 cancelOnError: true); |
717 return future; | 698 return future; |
718 } | 699 } |
719 | 700 |
720 /** | 701 /** |
721 * Executes [action] on each data event of the stream. | 702 * Executes [action] on each data event of the stream. |
722 * | 703 * |
723 * Completes the returned [Future] when all events of the stream | 704 * Completes the returned [Future] when all events of the stream |
724 * have been processed. Completes the future with an error if the | 705 * have been processed. Completes the future with an error if the |
725 * stream has an error event, or if [action] throws. | 706 * stream has an error event, or if [action] throws. |
726 */ | 707 */ |
727 Future forEach(void action(T element)) { | 708 Future forEach(void action(T element)) { |
728 _Future future = new _Future(); | 709 _Future future = new _Future(); |
729 StreamSubscription subscription; | 710 StreamSubscription subscription; |
730 subscription = this.listen( | 711 subscription = this.listen( |
731 (T element) { | 712 (T element) { |
732 _runUserCode( | 713 _runUserCode(() => action(element), (_) {}, |
733 () => action(element), | 714 _cancelAndErrorClosure(subscription, future)); |
734 (_) {}, | |
735 _cancelAndErrorClosure(subscription, future) | |
736 ); | |
737 }, | 715 }, |
738 onError: future._completeError, | 716 onError: future._completeError, |
739 onDone: () { | 717 onDone: () { |
740 future._complete(null); | 718 future._complete(null); |
741 }, | 719 }, |
742 cancelOnError: true); | 720 cancelOnError: true); |
743 return future; | 721 return future; |
744 } | 722 } |
745 | 723 |
746 /** | 724 /** |
747 * Checks whether [test] accepts all elements provided by this stream. | 725 * Checks whether [test] accepts all elements provided by this stream. |
748 * | 726 * |
749 * Completes the [Future] when the answer is known. | 727 * Completes the [Future] when the answer is known. |
750 * If this stream reports an error, the [Future] will report that error. | 728 * If this stream reports an error, the [Future] will report that error. |
751 */ | 729 */ |
752 Future<bool> every(bool test(T element)) { | 730 Future<bool> every(bool test(T element)) { |
753 _Future<bool> future = new _Future<bool>(); | 731 _Future<bool> future = new _Future<bool>(); |
754 StreamSubscription subscription; | 732 StreamSubscription subscription; |
755 subscription = this.listen( | 733 subscription = this.listen( |
756 (T element) { | 734 (T element) { |
757 _runUserCode( | 735 _runUserCode(() => test(element), (bool isMatch) { |
758 () => test(element), | 736 if (!isMatch) { |
759 (bool isMatch) { | 737 _cancelAndValue(subscription, future, false); |
760 if (!isMatch) { | 738 } |
761 _cancelAndValue(subscription, future, false); | 739 }, _cancelAndErrorClosure(subscription, future)); |
762 } | |
763 }, | |
764 _cancelAndErrorClosure(subscription, future) | |
765 ); | |
766 }, | 740 }, |
767 onError: future._completeError, | 741 onError: future._completeError, |
768 onDone: () { | 742 onDone: () { |
769 future._complete(true); | 743 future._complete(true); |
770 }, | 744 }, |
771 cancelOnError: true); | 745 cancelOnError: true); |
772 return future; | 746 return future; |
773 } | 747 } |
774 | 748 |
775 /** | 749 /** |
776 * Checks whether [test] accepts any element provided by this stream. | 750 * Checks whether [test] accepts any element provided by this stream. |
777 * | 751 * |
778 * Completes the [Future] when the answer is known. | 752 * Completes the [Future] when the answer is known. |
779 * | 753 * |
780 * If this stream reports an error, the [Future] reports that error. | 754 * If this stream reports an error, the [Future] reports that error. |
781 * | 755 * |
782 * Stops listening to the stream after the first matching element has been | 756 * Stops listening to the stream after the first matching element has been |
783 * found. | 757 * found. |
784 * | 758 * |
785 * Internally the method cancels its subscription after this element. This | 759 * Internally the method cancels its subscription after this element. This |
786 * means that single-subscription (non-broadcast) streams are closed and | 760 * means that single-subscription (non-broadcast) streams are closed and |
787 * cannot be reused after a call to this method. | 761 * cannot be reused after a call to this method. |
788 */ | 762 */ |
789 Future<bool> any(bool test(T element)) { | 763 Future<bool> any(bool test(T element)) { |
790 _Future<bool> future = new _Future<bool>(); | 764 _Future<bool> future = new _Future<bool>(); |
791 StreamSubscription subscription; | 765 StreamSubscription subscription; |
792 subscription = this.listen( | 766 subscription = this.listen( |
793 (T element) { | 767 (T element) { |
794 _runUserCode( | 768 _runUserCode(() => test(element), (bool isMatch) { |
795 () => test(element), | 769 if (isMatch) { |
796 (bool isMatch) { | 770 _cancelAndValue(subscription, future, true); |
797 if (isMatch) { | 771 } |
798 _cancelAndValue(subscription, future, true); | 772 }, _cancelAndErrorClosure(subscription, future)); |
799 } | |
800 }, | |
801 _cancelAndErrorClosure(subscription, future) | |
802 ); | |
803 }, | 773 }, |
804 onError: future._completeError, | 774 onError: future._completeError, |
805 onDone: () { | 775 onDone: () { |
806 future._complete(false); | 776 future._complete(false); |
807 }, | 777 }, |
808 cancelOnError: true); | 778 cancelOnError: true); |
809 return future; | 779 return future; |
810 } | 780 } |
811 | 781 |
812 | |
813 /** Counts the elements in the stream. */ | 782 /** Counts the elements in the stream. */ |
814 Future<int> get length { | 783 Future<int> get length { |
815 _Future<int> future = new _Future<int>(); | 784 _Future<int> future = new _Future<int>(); |
816 int count = 0; | 785 int count = 0; |
817 this.listen( | 786 this.listen( |
818 (_) { count++; }, | 787 (_) { |
819 onError: future._completeError, | 788 count++; |
820 onDone: () { | 789 }, |
821 future._complete(count); | 790 onError: future._completeError, |
822 }, | 791 onDone: () { |
823 cancelOnError: true); | 792 future._complete(count); |
| 793 }, |
| 794 cancelOnError: true); |
824 return future; | 795 return future; |
825 } | 796 } |
826 | 797 |
827 /** | 798 /** |
828 * Reports whether this stream contains any elements. | 799 * Reports whether this stream contains any elements. |
829 * | 800 * |
830 * Stops listening to the stream after the first element has been received. | 801 * Stops listening to the stream after the first element has been received. |
831 * | 802 * |
832 * Internally the method cancels its subscription after the first element. | 803 * Internally the method cancels its subscription after the first element. |
833 * This means that single-subscription (non-broadcast) streams are closed and | 804 * This means that single-subscription (non-broadcast) streams are closed and |
834 * cannot be reused after a call to this getter. | 805 * cannot be reused after a call to this getter. |
835 */ | 806 */ |
836 Future<bool> get isEmpty { | 807 Future<bool> get isEmpty { |
837 _Future<bool> future = new _Future<bool>(); | 808 _Future<bool> future = new _Future<bool>(); |
838 StreamSubscription subscription; | 809 StreamSubscription subscription; |
839 subscription = this.listen( | 810 subscription = this.listen( |
840 (_) { | 811 (_) { |
841 _cancelAndValue(subscription, future, false); | 812 _cancelAndValue(subscription, future, false); |
842 }, | 813 }, |
843 onError: future._completeError, | 814 onError: future._completeError, |
844 onDone: () { | 815 onDone: () { |
845 future._complete(true); | 816 future._complete(true); |
846 }, | 817 }, |
847 cancelOnError: true); | 818 cancelOnError: true); |
848 return future; | 819 return future; |
849 } | 820 } |
850 | 821 |
851 /** Collects the data of this stream in a [List]. */ | 822 /** Collects the data of this stream in a [List]. */ |
852 Future<List<T>> toList() { | 823 Future<List<T>> toList() { |
853 List<T> result = <T>[]; | 824 List<T> result = <T>[]; |
854 _Future<List<T>> future = new _Future<List<T>>(); | 825 _Future<List<T>> future = new _Future<List<T>>(); |
855 this.listen( | 826 this.listen( |
856 (T data) { | 827 (T data) { |
857 result.add(data); | 828 result.add(data); |
858 }, | 829 }, |
859 onError: future._completeError, | 830 onError: future._completeError, |
860 onDone: () { | 831 onDone: () { |
861 future._complete(result); | 832 future._complete(result); |
862 }, | 833 }, |
863 cancelOnError: true); | 834 cancelOnError: true); |
864 return future; | 835 return future; |
865 } | 836 } |
866 | 837 |
867 /** | 838 /** |
868 * Collects the data of this stream in a [Set]. | 839 * Collects the data of this stream in a [Set]. |
869 * | 840 * |
870 * The returned set is the same type as returned by `new Set<T>()`. | 841 * The returned set is the same type as returned by `new Set<T>()`. |
871 * If another type of set is needed, either use [forEach] to add each | 842 * If another type of set is needed, either use [forEach] to add each |
872 * element to the set, or use | 843 * element to the set, or use |
873 * `toList().then((list) => new SomeOtherSet.from(list))` | 844 * `toList().then((list) => new SomeOtherSet.from(list))` |
874 * to create the set. | 845 * to create the set. |
875 */ | 846 */ |
876 Future<Set<T>> toSet() { | 847 Future<Set<T>> toSet() { |
877 Set<T> result = new Set<T>(); | 848 Set<T> result = new Set<T>(); |
878 _Future<Set<T>> future = new _Future<Set<T>>(); | 849 _Future<Set<T>> future = new _Future<Set<T>>(); |
879 this.listen( | 850 this.listen( |
880 (T data) { | 851 (T data) { |
881 result.add(data); | 852 result.add(data); |
882 }, | 853 }, |
883 onError: future._completeError, | 854 onError: future._completeError, |
884 onDone: () { | 855 onDone: () { |
885 future._complete(result); | 856 future._complete(result); |
886 }, | 857 }, |
887 cancelOnError: true); | 858 cancelOnError: true); |
888 return future; | 859 return future; |
889 } | 860 } |
890 | 861 |
891 /** | 862 /** |
892 * Discards all data on the stream, but signals when it's done or an error | 863 * Discards all data on the stream, but signals when it's done or an error |
893 * occurred. | 864 * occurred. |
894 * | 865 * |
895 * When subscribing using [drain], cancelOnError will be true. This means | 866 * When subscribing using [drain], cancelOnError will be true. This means |
896 * that the future will complete with the first error on the stream and then | 867 * that the future will complete with the first error on the stream and then |
897 * cancel the subscription. | 868 * cancel the subscription. |
898 * | 869 * |
899 * In case of a `done` event the future completes with the given | 870 * In case of a `done` event the future completes with the given |
900 * [futureValue]. | 871 * [futureValue]. |
901 */ | 872 */ |
902 Future<E> drain<E>([E futureValue]) | 873 Future<E> drain<E>([E futureValue]) => listen(null, cancelOnError: true) |
903 => listen(null, cancelOnError: true).asFuture<E>(futureValue); | 874 .asFuture<E>(futureValue); |
904 | 875 |
905 /** | 876 /** |
906 * Provides at most the first [count] data events of this stream. | 877 * Provides at most the first [count] data events of this stream. |
907 * | 878 * |
908 * Forwards all events of this stream to the returned stream | 879 * Forwards all events of this stream to the returned stream |
909 * until [count] data events have been forwarded or this stream ends, | 880 * until [count] data events have been forwarded or this stream ends, |
910 * then ends the returned stream with a done event. | 881 * then ends the returned stream with a done event. |
911 * | 882 * |
912 * If this stream produces fewer than [count] data events before it's done, | 883 * If this stream produces fewer than [count] data events before it's done, |
913 * so will the returned stream. | 884 * so will the returned stream. |
(...skipping 93 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
1007 * If this stream is empty (a done event occurs before the first data event), | 978 * If this stream is empty (a done event occurs before the first data event), |
1008 * the resulting future completes with a [StateError]. | 979 * the resulting future completes with a [StateError]. |
1009 * | 980 * |
1010 * Except for the type of the error, this method is equivalent to | 981 * Except for the type of the error, this method is equivalent to |
1011 * [:this.elementAt(0):]. | 982 * [:this.elementAt(0):]. |
1012 */ | 983 */ |
1013 Future<T> get first { | 984 Future<T> get first { |
1014 _Future<T> future = new _Future<T>(); | 985 _Future<T> future = new _Future<T>(); |
1015 StreamSubscription subscription; | 986 StreamSubscription subscription; |
1016 subscription = this.listen( | 987 subscription = this.listen( |
1017 (T value) { | 988 (T value) { |
1018 _cancelAndValue(subscription, future, value); | 989 _cancelAndValue(subscription, future, value); |
1019 }, | 990 }, |
1020 onError: future._completeError, | 991 onError: future._completeError, |
1021 onDone: () { | 992 onDone: () { |
1022 try { | 993 try { |
1023 throw IterableElementError.noElement(); | 994 throw IterableElementError.noElement(); |
1024 } catch (e, s) { | 995 } catch (e, s) { |
1025 _completeWithErrorCallback(future, e, s); | 996 _completeWithErrorCallback(future, e, s); |
1026 } | 997 } |
1027 }, | 998 }, |
1028 cancelOnError: true); | 999 cancelOnError: true); |
1029 return future; | 1000 return future; |
1030 } | 1001 } |
1031 | 1002 |
1032 /** | 1003 /** |
1033 * Returns the last element of the stream. | 1004 * Returns the last element of the stream. |
1034 * | 1005 * |
1035 * If an error event occurs before the first data event, the resulting future | 1006 * If an error event occurs before the first data event, the resulting future |
1036 * is completed with that error. | 1007 * is completed with that error. |
1037 * | 1008 * |
1038 * If this stream is empty (a done event occurs before the first data event), | 1009 * If this stream is empty (a done event occurs before the first data event), |
1039 * the resulting future completes with a [StateError]. | 1010 * the resulting future completes with a [StateError]. |
1040 */ | 1011 */ |
1041 Future<T> get last { | 1012 Future<T> get last { |
1042 _Future<T> future = new _Future<T>(); | 1013 _Future<T> future = new _Future<T>(); |
1043 T result = null; | 1014 T result = null; |
1044 bool foundResult = false; | 1015 bool foundResult = false; |
1045 listen( | 1016 listen( |
1046 (T value) { | 1017 (T value) { |
1047 foundResult = true; | 1018 foundResult = true; |
1048 result = value; | 1019 result = value; |
1049 }, | 1020 }, |
1050 onError: future._completeError, | 1021 onError: future._completeError, |
1051 onDone: () { | 1022 onDone: () { |
1052 if (foundResult) { | 1023 if (foundResult) { |
1053 future._complete(result); | 1024 future._complete(result); |
1054 return; | 1025 return; |
1055 } | 1026 } |
1056 try { | 1027 try { |
1057 throw IterableElementError.noElement(); | 1028 throw IterableElementError.noElement(); |
1058 } catch (e, s) { | 1029 } catch (e, s) { |
1059 _completeWithErrorCallback(future, e, s); | 1030 _completeWithErrorCallback(future, e, s); |
1060 } | 1031 } |
1061 }, | 1032 }, |
1062 cancelOnError: true); | 1033 cancelOnError: true); |
1063 return future; | 1034 return future; |
1064 } | 1035 } |
1065 | 1036 |
1066 /** | 1037 /** |
1067 * Returns the single element. | 1038 * Returns the single element. |
1068 * | 1039 * |
1069 * If an error event occurs before or after the first data event, the | 1040 * If an error event occurs before or after the first data event, the |
1070 * resulting future is completed with that error. | 1041 * resulting future is completed with that error. |
1071 * | 1042 * |
1072 * If [this] is empty or has more than one element throws a [StateError]. | 1043 * If [this] is empty or has more than one element throws a [StateError]. |
1073 */ | 1044 */ |
1074 Future<T> get single { | 1045 Future<T> get single { |
1075 _Future<T> future = new _Future<T>(); | 1046 _Future<T> future = new _Future<T>(); |
1076 T result = null; | 1047 T result = null; |
1077 bool foundResult = false; | 1048 bool foundResult = false; |
1078 StreamSubscription subscription; | 1049 StreamSubscription subscription; |
1079 subscription = this.listen( | 1050 subscription = this.listen( |
1080 (T value) { | 1051 (T value) { |
1081 if (foundResult) { | 1052 if (foundResult) { |
1082 // This is the second element we get. | 1053 // This is the second element we get. |
| 1054 try { |
| 1055 throw IterableElementError.tooMany(); |
| 1056 } catch (e, s) { |
| 1057 _cancelAndErrorWithReplacement(subscription, future, e, s); |
| 1058 } |
| 1059 return; |
| 1060 } |
| 1061 foundResult = true; |
| 1062 result = value; |
| 1063 }, |
| 1064 onError: future._completeError, |
| 1065 onDone: () { |
| 1066 if (foundResult) { |
| 1067 future._complete(result); |
| 1068 return; |
| 1069 } |
1083 try { | 1070 try { |
1084 throw IterableElementError.tooMany(); | 1071 throw IterableElementError.noElement(); |
1085 } catch (e, s) { | 1072 } catch (e, s) { |
1086 _cancelAndErrorWithReplacement(subscription, future, e, s); | 1073 _completeWithErrorCallback(future, e, s); |
1087 } | 1074 } |
1088 return; | 1075 }, |
1089 } | 1076 cancelOnError: true); |
1090 foundResult = true; | |
1091 result = value; | |
1092 }, | |
1093 onError: future._completeError, | |
1094 onDone: () { | |
1095 if (foundResult) { | |
1096 future._complete(result); | |
1097 return; | |
1098 } | |
1099 try { | |
1100 throw IterableElementError.noElement(); | |
1101 } catch (e, s) { | |
1102 _completeWithErrorCallback(future, e, s); | |
1103 } | |
1104 }, | |
1105 cancelOnError: true); | |
1106 return future; | 1077 return future; |
1107 } | 1078 } |
1108 | 1079 |
1109 /** | 1080 /** |
1110 * Finds the first element of this stream matching [test]. | 1081 * Finds the first element of this stream matching [test]. |
1111 * | 1082 * |
1112 * Returns a future that is filled with the first element of this stream | 1083 * Returns a future that is filled with the first element of this stream |
1113 * that [test] returns true for. | 1084 * that [test] returns true for. |
1114 * | 1085 * |
1115 * If no such element is found before this stream is done, and a | 1086 * If no such element is found before this stream is done, and a |
1116 * [defaultValue] function is provided, the result of calling [defaultValue] | 1087 * [defaultValue] function is provided, the result of calling [defaultValue] |
1117 * becomes the value of the future. | 1088 * becomes the value of the future. |
1118 * | 1089 * |
1119 * Stops listening to the stream after the first matching element has been | 1090 * Stops listening to the stream after the first matching element has been |
1120 * received. | 1091 * received. |
1121 * | 1092 * |
1122 * Internally the method cancels its subscription after the first element that | 1093 * Internally the method cancels its subscription after the first element that |
1123 * matches the predicate. This means that single-subscription (non-broadcast) | 1094 * matches the predicate. This means that single-subscription (non-broadcast) |
1124 * streams are closed and cannot be reused after a call to this method. | 1095 * streams are closed and cannot be reused after a call to this method. |
1125 * | 1096 * |
1126 * If an error occurs, or if this stream ends without finding a match and | 1097 * If an error occurs, or if this stream ends without finding a match and |
1127 * with no [defaultValue] function provided, the future will receive an | 1098 * with no [defaultValue] function provided, the future will receive an |
1128 * error. | 1099 * error. |
1129 */ | 1100 */ |
1130 Future<dynamic> firstWhere(bool test(T element), {Object defaultValue()}) { | 1101 Future<dynamic> firstWhere(bool test(T element), {Object defaultValue()}) { |
1131 _Future<dynamic> future = new _Future(); | 1102 _Future<dynamic> future = new _Future(); |
1132 StreamSubscription subscription; | 1103 StreamSubscription subscription; |
1133 subscription = this.listen( | 1104 subscription = this.listen( |
1134 (T value) { | 1105 (T value) { |
1135 _runUserCode( | 1106 _runUserCode(() => test(value), (bool isMatch) { |
1136 () => test(value), | |
1137 (bool isMatch) { | |
1138 if (isMatch) { | 1107 if (isMatch) { |
1139 _cancelAndValue(subscription, future, value); | 1108 _cancelAndValue(subscription, future, value); |
1140 } | 1109 } |
1141 }, | 1110 }, _cancelAndErrorClosure(subscription, future)); |
1142 _cancelAndErrorClosure(subscription, future) | 1111 }, |
1143 ); | 1112 onError: future._completeError, |
1144 }, | 1113 onDone: () { |
1145 onError: future._completeError, | 1114 if (defaultValue != null) { |
1146 onDone: () { | 1115 _runUserCode(defaultValue, future._complete, future._completeError); |
1147 if (defaultValue != null) { | 1116 return; |
1148 _runUserCode(defaultValue, future._complete, future._completeError); | 1117 } |
1149 return; | 1118 try { |
1150 } | 1119 throw IterableElementError.noElement(); |
1151 try { | 1120 } catch (e, s) { |
1152 throw IterableElementError.noElement(); | 1121 _completeWithErrorCallback(future, e, s); |
1153 } catch (e, s) { | 1122 } |
1154 _completeWithErrorCallback(future, e, s); | 1123 }, |
1155 } | 1124 cancelOnError: true); |
1156 }, | |
1157 cancelOnError: true); | |
1158 return future; | 1125 return future; |
1159 } | 1126 } |
1160 | 1127 |
1161 /** | 1128 /** |
1162 * Finds the last element in this stream matching [test]. | 1129 * Finds the last element in this stream matching [test]. |
1163 * | 1130 * |
1164 * As [firstWhere], except that the last matching element is found. | 1131 * As [firstWhere], except that the last matching element is found. |
1165 * That means that the result cannot be provided before this stream | 1132 * That means that the result cannot be provided before this stream |
1166 * is done. | 1133 * is done. |
1167 */ | 1134 */ |
1168 Future<dynamic> lastWhere(bool test(T element), {Object defaultValue()}) { | 1135 Future<dynamic> lastWhere(bool test(T element), {Object defaultValue()}) { |
1169 _Future<dynamic> future = new _Future(); | 1136 _Future<dynamic> future = new _Future(); |
1170 T result = null; | 1137 T result = null; |
1171 bool foundResult = false; | 1138 bool foundResult = false; |
1172 StreamSubscription subscription; | 1139 StreamSubscription subscription; |
1173 subscription = this.listen( | 1140 subscription = this.listen( |
1174 (T value) { | 1141 (T value) { |
1175 _runUserCode( | 1142 _runUserCode(() => true == test(value), (bool isMatch) { |
1176 () => true == test(value), | |
1177 (bool isMatch) { | |
1178 if (isMatch) { | 1143 if (isMatch) { |
1179 foundResult = true; | 1144 foundResult = true; |
1180 result = value; | 1145 result = value; |
1181 } | 1146 } |
1182 }, | 1147 }, _cancelAndErrorClosure(subscription, future)); |
1183 _cancelAndErrorClosure(subscription, future) | 1148 }, |
1184 ); | 1149 onError: future._completeError, |
1185 }, | 1150 onDone: () { |
1186 onError: future._completeError, | 1151 if (foundResult) { |
1187 onDone: () { | 1152 future._complete(result); |
1188 if (foundResult) { | 1153 return; |
1189 future._complete(result); | 1154 } |
1190 return; | 1155 if (defaultValue != null) { |
1191 } | 1156 _runUserCode(defaultValue, future._complete, future._completeError); |
1192 if (defaultValue != null) { | 1157 return; |
1193 _runUserCode(defaultValue, future._complete, future._completeError); | 1158 } |
1194 return; | 1159 try { |
1195 } | 1160 throw IterableElementError.noElement(); |
1196 try { | 1161 } catch (e, s) { |
1197 throw IterableElementError.noElement(); | 1162 _completeWithErrorCallback(future, e, s); |
1198 } catch (e, s) { | 1163 } |
1199 _completeWithErrorCallback(future, e, s); | 1164 }, |
1200 } | 1165 cancelOnError: true); |
1201 }, | |
1202 cancelOnError: true); | |
1203 return future; | 1166 return future; |
1204 } | 1167 } |
1205 | 1168 |
1206 /** | 1169 /** |
1207 * Finds the single element in this stream matching [test]. | 1170 * Finds the single element in this stream matching [test]. |
1208 * | 1171 * |
1209 * Like [lastMatch], except that it is an error if more than one | 1172 * Like [lastMatch], except that it is an error if more than one |
1210 * matching element occurs in the stream. | 1173 * matching element occurs in the stream. |
1211 */ | 1174 */ |
1212 Future<T> singleWhere(bool test(T element)) { | 1175 Future<T> singleWhere(bool test(T element)) { |
1213 _Future<T> future = new _Future<T>(); | 1176 _Future<T> future = new _Future<T>(); |
1214 T result = null; | 1177 T result = null; |
1215 bool foundResult = false; | 1178 bool foundResult = false; |
1216 StreamSubscription subscription; | 1179 StreamSubscription subscription; |
1217 subscription = this.listen( | 1180 subscription = this.listen( |
1218 (T value) { | 1181 (T value) { |
1219 _runUserCode( | 1182 _runUserCode(() => true == test(value), (bool isMatch) { |
1220 () => true == test(value), | |
1221 (bool isMatch) { | |
1222 if (isMatch) { | 1183 if (isMatch) { |
1223 if (foundResult) { | 1184 if (foundResult) { |
1224 try { | 1185 try { |
1225 throw IterableElementError.tooMany(); | 1186 throw IterableElementError.tooMany(); |
1226 } catch (e, s) { | 1187 } catch (e, s) { |
1227 _cancelAndErrorWithReplacement(subscription, future, e, s); | 1188 _cancelAndErrorWithReplacement(subscription, future, e, s); |
1228 } | 1189 } |
1229 return; | 1190 return; |
1230 } | 1191 } |
1231 foundResult = true; | 1192 foundResult = true; |
1232 result = value; | 1193 result = value; |
1233 } | 1194 } |
1234 }, | 1195 }, _cancelAndErrorClosure(subscription, future)); |
1235 _cancelAndErrorClosure(subscription, future) | 1196 }, |
1236 ); | 1197 onError: future._completeError, |
1237 }, | 1198 onDone: () { |
1238 onError: future._completeError, | 1199 if (foundResult) { |
1239 onDone: () { | 1200 future._complete(result); |
1240 if (foundResult) { | 1201 return; |
1241 future._complete(result); | 1202 } |
1242 return; | 1203 try { |
1243 } | 1204 throw IterableElementError.noElement(); |
1244 try { | 1205 } catch (e, s) { |
1245 throw IterableElementError.noElement(); | 1206 _completeWithErrorCallback(future, e, s); |
1246 } catch (e, s) { | 1207 } |
1247 _completeWithErrorCallback(future, e, s); | 1208 }, |
1248 } | 1209 cancelOnError: true); |
1249 }, | |
1250 cancelOnError: true); | |
1251 return future; | 1210 return future; |
1252 } | 1211 } |
1253 | 1212 |
1254 /** | 1213 /** |
1255 * Returns the value of the [index]th data event of this stream. | 1214 * Returns the value of the [index]th data event of this stream. |
1256 * | 1215 * |
1257 * Stops listening to the stream after the [index]th data event has been | 1216 * Stops listening to the stream after the [index]th data event has been |
1258 * received. | 1217 * received. |
1259 * | 1218 * |
1260 * Internally the method cancels its subscription after these elements. This | 1219 * Internally the method cancels its subscription after these elements. This |
1261 * means that single-subscription (non-broadcast) streams are closed and | 1220 * means that single-subscription (non-broadcast) streams are closed and |
1262 * cannot be reused after a call to this method. | 1221 * cannot be reused after a call to this method. |
1263 * | 1222 * |
1264 * If an error event occurs before the value is found, the future completes | 1223 * If an error event occurs before the value is found, the future completes |
1265 * with this error. | 1224 * with this error. |
1266 * | 1225 * |
1267 * If a done event occurs before the value is found, the future completes | 1226 * If a done event occurs before the value is found, the future completes |
1268 * with a [RangeError]. | 1227 * with a [RangeError]. |
1269 */ | 1228 */ |
1270 Future<T> elementAt(int index) { | 1229 Future<T> elementAt(int index) { |
1271 if (index is! int || index < 0) throw new ArgumentError(index); | 1230 if (index is! int || index < 0) throw new ArgumentError(index); |
1272 _Future<T> future = new _Future<T>(); | 1231 _Future<T> future = new _Future<T>(); |
1273 StreamSubscription subscription; | 1232 StreamSubscription subscription; |
1274 int elementIndex = 0; | 1233 int elementIndex = 0; |
1275 subscription = this.listen( | 1234 subscription = this.listen( |
1276 (T value) { | 1235 (T value) { |
1277 if (index == elementIndex) { | 1236 if (index == elementIndex) { |
1278 _cancelAndValue(subscription, future, value); | 1237 _cancelAndValue(subscription, future, value); |
1279 return; | 1238 return; |
1280 } | 1239 } |
1281 elementIndex += 1; | 1240 elementIndex += 1; |
1282 }, | 1241 }, |
1283 onError: future._completeError, | 1242 onError: future._completeError, |
1284 onDone: () { | 1243 onDone: () { |
1285 future._completeError( | 1244 future._completeError( |
1286 new RangeError.index(index, this, "index", null, elementIndex)); | 1245 new RangeError.index(index, this, "index", null, elementIndex)); |
1287 }, | 1246 }, |
1288 cancelOnError: true); | 1247 cancelOnError: true); |
1289 return future; | 1248 return future; |
1290 } | 1249 } |
1291 | 1250 |
1292 /** | 1251 /** |
1293 * Creates a new stream with the same events as this stream. | 1252 * Creates a new stream with the same events as this stream. |
1294 * | 1253 * |
1295 * Whenever more than [timeLimit] passes between two events from this stream, | 1254 * Whenever more than [timeLimit] passes between two events from this stream, |
1296 * the [onTimeout] function is called. | 1255 * the [onTimeout] function is called. |
1297 * | 1256 * |
1298 * The countdown doesn't start until the returned stream is listened to. | 1257 * The countdown doesn't start until the returned stream is listened to. |
(...skipping 18 matching lines...) Expand all Loading... |
1317 StreamSubscription<T> subscription; | 1276 StreamSubscription<T> subscription; |
1318 Timer timer; | 1277 Timer timer; |
1319 Zone zone; | 1278 Zone zone; |
1320 _TimerCallback timeout; | 1279 _TimerCallback timeout; |
1321 | 1280 |
1322 void onData(T event) { | 1281 void onData(T event) { |
1323 timer.cancel(); | 1282 timer.cancel(); |
1324 controller.add(event); | 1283 controller.add(event); |
1325 timer = zone.createTimer(timeLimit, timeout); | 1284 timer = zone.createTimer(timeLimit, timeout); |
1326 } | 1285 } |
| 1286 |
1327 void onError(error, StackTrace stackTrace) { | 1287 void onError(error, StackTrace stackTrace) { |
1328 timer.cancel(); | 1288 timer.cancel(); |
1329 assert(controller is _StreamController || | 1289 assert(controller is _StreamController || |
1330 controller is _BroadcastStreamController); | 1290 controller is _BroadcastStreamController); |
1331 dynamic eventSink = controller; | 1291 dynamic eventSink = controller; |
1332 eventSink._addError(error, stackTrace); // Avoid Zone error replacement. | 1292 eventSink._addError(error, stackTrace); // Avoid Zone error replacement. |
1333 timer = zone.createTimer(timeLimit, timeout); | 1293 timer = zone.createTimer(timeLimit, timeout); |
1334 } | 1294 } |
| 1295 |
1335 void onDone() { | 1296 void onDone() { |
1336 timer.cancel(); | 1297 timer.cancel(); |
1337 controller.close(); | 1298 controller.close(); |
1338 } | 1299 } |
| 1300 |
1339 void onListen() { | 1301 void onListen() { |
1340 // This is the onListen callback for of controller. | 1302 // This is the onListen callback for of controller. |
1341 // It runs in the same zone that the subscription was created in. | 1303 // It runs in the same zone that the subscription was created in. |
1342 // Use that zone for creating timers and running the onTimeout | 1304 // Use that zone for creating timers and running the onTimeout |
1343 // callback. | 1305 // callback. |
1344 zone = Zone.current; | 1306 zone = Zone.current; |
1345 if (onTimeout == null) { | 1307 if (onTimeout == null) { |
1346 timeout = () { | 1308 timeout = () { |
1347 controller.addError(new TimeoutException("No stream event", | 1309 controller.addError( |
1348 timeLimit), null); | 1310 new TimeoutException("No stream event", timeLimit), null); |
1349 }; | 1311 }; |
1350 } else { | 1312 } else { |
1351 // TODO(floitsch): the return type should be 'void', and the type | 1313 // TODO(floitsch): the return type should be 'void', and the type |
1352 // should be inferred. | 1314 // should be inferred. |
1353 var registeredOnTimeout = | 1315 var registeredOnTimeout = |
1354 zone.registerUnaryCallback<dynamic, EventSink<T>>(onTimeout); | 1316 zone.registerUnaryCallback<dynamic, EventSink<T>>(onTimeout); |
1355 _ControllerEventSinkWrapper wrapper = | 1317 _ControllerEventSinkWrapper wrapper = |
1356 new _ControllerEventSinkWrapper(null); | 1318 new _ControllerEventSinkWrapper(null); |
1357 timeout = () { | 1319 timeout = () { |
1358 wrapper._sink = controller; // Only valid during call. | 1320 wrapper._sink = controller; // Only valid during call. |
1359 zone.runUnaryGuarded(registeredOnTimeout, wrapper); | 1321 zone.runUnaryGuarded(registeredOnTimeout, wrapper); |
1360 wrapper._sink = null; | 1322 wrapper._sink = null; |
1361 }; | 1323 }; |
1362 } | 1324 } |
1363 | 1325 |
1364 subscription = this.listen(onData, onError: onError, onDone: onDone); | 1326 subscription = this.listen(onData, onError: onError, onDone: onDone); |
1365 timer = zone.createTimer(timeLimit, timeout); | 1327 timer = zone.createTimer(timeLimit, timeout); |
1366 } | 1328 } |
| 1329 |
1367 Future onCancel() { | 1330 Future onCancel() { |
1368 timer.cancel(); | 1331 timer.cancel(); |
1369 Future result = subscription.cancel(); | 1332 Future result = subscription.cancel(); |
1370 subscription = null; | 1333 subscription = null; |
1371 return result; | 1334 return result; |
1372 } | 1335 } |
| 1336 |
1373 controller = isBroadcast | 1337 controller = isBroadcast |
1374 ? new _SyncBroadcastStreamController<T>(onListen, onCancel) | 1338 ? new _SyncBroadcastStreamController<T>(onListen, onCancel) |
1375 : new _SyncStreamController<T>( | 1339 : new _SyncStreamController<T>(onListen, () { |
1376 onListen, | 1340 // Don't null the timer, onCancel may call cancel again. |
1377 () { | 1341 timer.cancel(); |
1378 // Don't null the timer, onCancel may call cancel again. | 1342 subscription.pause(); |
1379 timer.cancel(); | 1343 }, () { |
1380 subscription.pause(); | 1344 subscription.resume(); |
1381 }, | 1345 timer = zone.createTimer(timeLimit, timeout); |
1382 () { | 1346 }, onCancel); |
1383 subscription.resume(); | |
1384 timer = zone.createTimer(timeLimit, timeout); | |
1385 }, | |
1386 onCancel); | |
1387 return controller.stream; | 1347 return controller.stream; |
1388 } | 1348 } |
1389 } | 1349 } |
1390 | 1350 |
1391 /** | 1351 /** |
1392 * A subscription on events from a [Stream]. | 1352 * A subscription on events from a [Stream]. |
1393 * | 1353 * |
1394 * When you listen on a [Stream] using [Stream.listen], | 1354 * When you listen on a [Stream] using [Stream.listen], |
1395 * a [StreamSubscription] object is returned. | 1355 * a [StreamSubscription] object is returned. |
1396 * | 1356 * |
(...skipping 97 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
1494 * | 1454 * |
1495 * In case of an error the subscription will automatically cancel (even | 1455 * In case of an error the subscription will automatically cancel (even |
1496 * when it was listening with `cancelOnError` set to `false`). | 1456 * when it was listening with `cancelOnError` set to `false`). |
1497 * | 1457 * |
1498 * In case of a `done` event the future completes with the given | 1458 * In case of a `done` event the future completes with the given |
1499 * [futureValue]. | 1459 * [futureValue]. |
1500 */ | 1460 */ |
1501 Future<E> asFuture<E>([E futureValue]); | 1461 Future<E> asFuture<E>([E futureValue]); |
1502 } | 1462 } |
1503 | 1463 |
1504 | |
1505 /** | 1464 /** |
1506 * An interface that abstracts creation or handling of [Stream] events. | 1465 * An interface that abstracts creation or handling of [Stream] events. |
1507 */ | 1466 */ |
1508 abstract class EventSink<T> implements Sink<T> { | 1467 abstract class EventSink<T> implements Sink<T> { |
1509 /** Send a data event to a stream. */ | 1468 /** Send a data event to a stream. */ |
1510 void add(T event); | 1469 void add(T event); |
1511 | 1470 |
1512 /** Send an async error to a stream. */ | 1471 /** Send an async error to a stream. */ |
1513 void addError(errorEvent, [StackTrace stackTrace]); | 1472 void addError(errorEvent, [StackTrace stackTrace]); |
1514 | 1473 |
1515 /** Close the sink. No further events can be added after closing. */ | 1474 /** Close the sink. No further events can be added after closing. */ |
1516 void close(); | 1475 void close(); |
1517 } | 1476 } |
1518 | 1477 |
1519 | |
1520 /** [Stream] wrapper that only exposes the [Stream] interface. */ | 1478 /** [Stream] wrapper that only exposes the [Stream] interface. */ |
1521 class StreamView<T> extends Stream<T> { | 1479 class StreamView<T> extends Stream<T> { |
1522 final Stream<T> _stream; | 1480 final Stream<T> _stream; |
1523 | 1481 |
1524 const StreamView(Stream<T> stream) : _stream = stream, super._internal(); | 1482 const StreamView(Stream<T> stream) |
| 1483 : _stream = stream, |
| 1484 super._internal(); |
1525 | 1485 |
1526 bool get isBroadcast => _stream.isBroadcast; | 1486 bool get isBroadcast => _stream.isBroadcast; |
1527 | 1487 |
1528 Stream<T> asBroadcastStream( | 1488 Stream<T> asBroadcastStream( |
1529 {void onListen(StreamSubscription<T> subscription), | 1489 {void onListen(StreamSubscription<T> subscription), |
1530 void onCancel(StreamSubscription<T> subscription)}) | 1490 void onCancel(StreamSubscription<T> subscription)}) => |
1531 => _stream.asBroadcastStream(onListen: onListen, onCancel: onCancel); | 1491 _stream.asBroadcastStream(onListen: onListen, onCancel: onCancel); |
1532 | 1492 |
1533 StreamSubscription<T> listen(void onData(T value), | 1493 StreamSubscription<T> listen(void onData(T value), |
1534 { Function onError, | 1494 {Function onError, void onDone(), bool cancelOnError}) { |
1535 void onDone(), | 1495 return _stream.listen(onData, |
1536 bool cancelOnError }) { | 1496 onError: onError, onDone: onDone, cancelOnError: cancelOnError); |
1537 return _stream.listen(onData, onError: onError, onDone: onDone, | |
1538 cancelOnError: cancelOnError); | |
1539 } | 1497 } |
1540 } | 1498 } |
1541 | 1499 |
1542 | |
1543 /** | 1500 /** |
1544 * Abstract interface for a "sink" accepting multiple entire streams. | 1501 * Abstract interface for a "sink" accepting multiple entire streams. |
1545 * | 1502 * |
1546 * A consumer can accept a number of consecutive streams using [addStream], | 1503 * A consumer can accept a number of consecutive streams using [addStream], |
1547 * and when no further data need to be added, the [close] method tells the | 1504 * and when no further data need to be added, the [close] method tells the |
1548 * consumer to complete its work and shut down. | 1505 * consumer to complete its work and shut down. |
1549 * | 1506 * |
1550 * This class is not just a [Sink<Stream>] because it is also combined with | 1507 * This class is not just a [Sink<Stream>] because it is also combined with |
1551 * other [Sink] classes, like it's combined with [EventSink] in the | 1508 * other [Sink] classes, like it's combined with [EventSink] in the |
1552 * [StreamSink] class. | 1509 * [StreamSink] class. |
(...skipping 29 matching lines...) Expand all Loading... |
1582 * This allows the consumer to complete any remaining work and release | 1539 * This allows the consumer to complete any remaining work and release |
1583 * resources that are no longer needed | 1540 * resources that are no longer needed |
1584 * | 1541 * |
1585 * Returns a future which is completed when the consumer has shut down. | 1542 * Returns a future which is completed when the consumer has shut down. |
1586 * If cleaning up can fail, the error may be reported in the returned future, | 1543 * If cleaning up can fail, the error may be reported in the returned future, |
1587 * otherwise it completes with `null`. | 1544 * otherwise it completes with `null`. |
1588 */ | 1545 */ |
1589 Future close(); | 1546 Future close(); |
1590 } | 1547 } |
1591 | 1548 |
1592 | |
1593 /** | 1549 /** |
1594 * A object that accepts stream events both synchronously and asynchronously. | 1550 * A object that accepts stream events both synchronously and asynchronously. |
1595 * | 1551 * |
1596 * A [StreamSink] unifies the asynchronous methods from [StreamConsumer] and | 1552 * A [StreamSink] unifies the asynchronous methods from [StreamConsumer] and |
1597 * the synchronous methods from [EventSink]. | 1553 * the synchronous methods from [EventSink]. |
1598 * | 1554 * |
1599 * The [EventSink] methods can't be used while the [addStream] is called. | 1555 * The [EventSink] methods can't be used while the [addStream] is called. |
1600 * As soon as the [addStream]'s [Future] completes with a value, the | 1556 * As soon as the [addStream]'s [Future] completes with a value, the |
1601 * [EventSink] methods can be used again. | 1557 * [EventSink] methods can be used again. |
1602 * | 1558 * |
(...skipping 38 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
1641 * | 1597 * |
1642 * Otherwise, the returned future will complete when either: | 1598 * Otherwise, the returned future will complete when either: |
1643 * | 1599 * |
1644 * * all events have been processed and the sink has been closed, or | 1600 * * all events have been processed and the sink has been closed, or |
1645 * * the sink has otherwise been stopped from handling more events | 1601 * * the sink has otherwise been stopped from handling more events |
1646 * (for example by cancelling a stream subscription). | 1602 * (for example by cancelling a stream subscription). |
1647 */ | 1603 */ |
1648 Future get done; | 1604 Future get done; |
1649 } | 1605 } |
1650 | 1606 |
1651 | |
1652 /** | 1607 /** |
1653 * The target of a [Stream.transform] call. | 1608 * The target of a [Stream.transform] call. |
1654 * | 1609 * |
1655 * The [Stream.transform] call will pass itself to this object and then return | 1610 * The [Stream.transform] call will pass itself to this object and then return |
1656 * the resulting stream. | 1611 * the resulting stream. |
1657 * | 1612 * |
1658 * It is good practice to write transformers that can be used multiple times. | 1613 * It is good practice to write transformers that can be used multiple times. |
1659 */ | 1614 */ |
1660 abstract class StreamTransformer<S, T> { | 1615 abstract class StreamTransformer<S, T> { |
1661 /** | 1616 /** |
(...skipping 53 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
1715 * cancelOnError: cancelOnError); | 1670 * cancelOnError: cancelOnError); |
1716 * }, | 1671 * }, |
1717 * onPause: () { subscription.pause(); }, | 1672 * onPause: () { subscription.pause(); }, |
1718 * onResume: () { subscription.resume(); }, | 1673 * onResume: () { subscription.resume(); }, |
1719 * onCancel: () => subscription.cancel(), | 1674 * onCancel: () => subscription.cancel(), |
1720 * sync: true); | 1675 * sync: true); |
1721 * return controller.stream.listen(null); | 1676 * return controller.stream.listen(null); |
1722 * }); | 1677 * }); |
1723 */ | 1678 */ |
1724 const factory StreamTransformer( | 1679 const factory StreamTransformer( |
1725 StreamSubscription<T> transformer(Stream<S> stream, bool cancelOnError)) | 1680 StreamSubscription<T> transformer( |
1726 = _StreamSubscriptionTransformer<S, T>; | 1681 Stream<S> stream, bool cancelOnError)) = |
| 1682 _StreamSubscriptionTransformer<S, T>; |
1727 | 1683 |
1728 /** | 1684 /** |
1729 * Creates a [StreamTransformer] that delegates events to the given functions. | 1685 * Creates a [StreamTransformer] that delegates events to the given functions. |
1730 * | 1686 * |
1731 * Example use of a duplicating transformer: | 1687 * Example use of a duplicating transformer: |
1732 * | 1688 * |
1733 * stringStream.transform(new StreamTransformer<String, String>.fromHandle
rs( | 1689 * stringStream.transform(new StreamTransformer<String, String>.fromHandle
rs( |
1734 * handleData: (String value, EventSink<String> sink) { | 1690 * handleData: (String value, EventSink<String> sink) { |
1735 * sink.add(value); | 1691 * sink.add(value); |
1736 * sink.add(value); // Duplicate the incoming events. | 1692 * sink.add(value); // Duplicate the incoming events. |
1737 * })); | 1693 * })); |
1738 */ | 1694 */ |
1739 factory StreamTransformer.fromHandlers({ | 1695 factory StreamTransformer.fromHandlers( |
1740 void handleData(S data, EventSink<T> sink), | 1696 {void handleData(S data, EventSink<T> sink), |
1741 void handleError(Object error, StackTrace stackTrace, EventSink<T> sink), | 1697 void handleError(Object error, StackTrace stackTrace, EventSink<T> sink), |
1742 void handleDone(EventSink<T> sink)}) | 1698 void handleDone(EventSink<T> sink)}) = _StreamHandlerTransformer<S, T>; |
1743 = _StreamHandlerTransformer<S, T>; | |
1744 | 1699 |
1745 /** | 1700 /** |
1746 * Transform the incoming [stream]'s events. | 1701 * Transform the incoming [stream]'s events. |
1747 * | 1702 * |
1748 * Creates a new stream. | 1703 * Creates a new stream. |
1749 * When this stream is listened to, it will start listening on [stream], | 1704 * When this stream is listened to, it will start listening on [stream], |
1750 * and generate events on the new stream based on the events from [stream]. | 1705 * and generate events on the new stream based on the events from [stream]. |
1751 * | 1706 * |
1752 * Subscriptions on the returned stream should propagate pause state | 1707 * Subscriptions on the returned stream should propagate pause state |
1753 * to the subscription on [stream]. | 1708 * to the subscription on [stream]. |
1754 */ | 1709 */ |
1755 Stream<T> bind(Stream<S> stream); | 1710 Stream<T> bind(Stream<S> stream); |
1756 } | 1711 } |
1757 | 1712 |
1758 /** | 1713 /** |
1759 * An [Iterator] like interface for the values of a [Stream]. | 1714 * An [Iterator] like interface for the values of a [Stream]. |
1760 * | 1715 * |
1761 * This wraps a [Stream] and a subscription on the stream. It listens | 1716 * This wraps a [Stream] and a subscription on the stream. It listens |
1762 * on the stream, and completes the future returned by [moveNext] when the | 1717 * on the stream, and completes the future returned by [moveNext] when the |
1763 * next value becomes available. | 1718 * next value becomes available. |
1764 * | 1719 * |
1765 * The stream may be paused between calls to [moveNext]. | 1720 * The stream may be paused between calls to [moveNext]. |
1766 */ | 1721 */ |
1767 abstract class StreamIterator<T> { | 1722 abstract class StreamIterator<T> { |
1768 | |
1769 /** Create a [StreamIterator] on [stream]. */ | 1723 /** Create a [StreamIterator] on [stream]. */ |
1770 factory StreamIterator(Stream<T> stream) | 1724 factory StreamIterator(Stream<T> stream) |
1771 // TODO(lrn): use redirecting factory constructor when type | 1725 // TODO(lrn): use redirecting factory constructor when type |
1772 // arguments are supported. | 1726 // arguments are supported. |
1773 => new _StreamIterator<T>(stream); | 1727 => |
| 1728 new _StreamIterator<T>(stream); |
1774 | 1729 |
1775 /** | 1730 /** |
1776 * Wait for the next stream value to be available. | 1731 * Wait for the next stream value to be available. |
1777 * | 1732 * |
1778 * Returns a future which will complete with either `true` or `false`. | 1733 * Returns a future which will complete with either `true` or `false`. |
1779 * Completing with `true` means that another event has been received and | 1734 * Completing with `true` means that another event has been received and |
1780 * can be read as [current]. | 1735 * can be read as [current]. |
1781 * Completing with `false` means that the stream iteration is done and | 1736 * Completing with `false` means that the stream iteration is done and |
1782 * no further events will ever be available. | 1737 * no further events will ever be available. |
1783 * The future may complete with an error, if the stream produces an error, | 1738 * The future may complete with an error, if the stream produces an error, |
(...skipping 31 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
1815 * If [moveNext] has been called when the iterator is canceled, | 1770 * If [moveNext] has been called when the iterator is canceled, |
1816 * its returned future will complete with `false` as value, | 1771 * its returned future will complete with `false` as value, |
1817 * as will all further calls to [moveNext]. | 1772 * as will all further calls to [moveNext]. |
1818 * | 1773 * |
1819 * Returns a future if the cancel-operation is not completed synchronously. | 1774 * Returns a future if the cancel-operation is not completed synchronously. |
1820 * Otherwise returns `null`. | 1775 * Otherwise returns `null`. |
1821 */ | 1776 */ |
1822 Future cancel(); | 1777 Future cancel(); |
1823 } | 1778 } |
1824 | 1779 |
1825 | |
1826 /** | 1780 /** |
1827 * Wraps an [_EventSink] so it exposes only the [EventSink] interface. | 1781 * Wraps an [_EventSink] so it exposes only the [EventSink] interface. |
1828 */ | 1782 */ |
1829 class _ControllerEventSinkWrapper<T> implements EventSink<T> { | 1783 class _ControllerEventSinkWrapper<T> implements EventSink<T> { |
1830 EventSink _sink; | 1784 EventSink _sink; |
1831 _ControllerEventSinkWrapper(this._sink); | 1785 _ControllerEventSinkWrapper(this._sink); |
1832 | 1786 |
1833 void add(T data) { _sink.add(data); } | 1787 void add(T data) { |
| 1788 _sink.add(data); |
| 1789 } |
| 1790 |
1834 void addError(error, [StackTrace stackTrace]) { | 1791 void addError(error, [StackTrace stackTrace]) { |
1835 _sink.addError(error, stackTrace); | 1792 _sink.addError(error, stackTrace); |
1836 } | 1793 } |
1837 void close() { _sink.close(); } | 1794 |
| 1795 void close() { |
| 1796 _sink.close(); |
| 1797 } |
1838 } | 1798 } |
OLD | NEW |