| OLD | NEW |
| 1 // Copyright (c) 2013, the Dart project authors. Please see the AUTHORS file | 1 // Copyright (c) 2013, the Dart project authors. Please see the AUTHORS file |
| 2 // for details. All rights reserved. Use of this source code is governed by a | 2 // for details. All rights reserved. Use of this source code is governed by a |
| 3 // BSD-style license that can be found in the LICENSE file. | 3 // BSD-style license that can be found in the LICENSE file. |
| 4 | 4 |
| 5 part of dart.async; | 5 part of dart.async; |
| 6 | 6 |
| 7 // ------------------------------------------------------------------- | 7 // ------------------------------------------------------------------- |
| 8 // Core Stream types | 8 // Core Stream types |
| 9 // ------------------------------------------------------------------- | 9 // ------------------------------------------------------------------- |
| 10 | 10 |
| (...skipping 86 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 97 * | 97 * |
| 98 * When the future completes, the stream will fire one event, either | 98 * When the future completes, the stream will fire one event, either |
| 99 * data or error, and then close with a done-event. | 99 * data or error, and then close with a done-event. |
| 100 */ | 100 */ |
| 101 factory Stream.fromFuture(Future<T> future) { | 101 factory Stream.fromFuture(Future<T> future) { |
| 102 // Use the controller's buffering to fill in the value even before | 102 // Use the controller's buffering to fill in the value even before |
| 103 // the stream has a listener. For a single value, it's not worth it | 103 // the stream has a listener. For a single value, it's not worth it |
| 104 // to wait for a listener before doing the `then` on the future. | 104 // to wait for a listener before doing the `then` on the future. |
| 105 _StreamController<T> controller = new StreamController<T>(sync: true); | 105 _StreamController<T> controller = new StreamController<T>(sync: true); |
| 106 future.then((value) { | 106 future.then((value) { |
| 107 controller._add(value); | 107 controller._add(value); |
| 108 controller._closeUnchecked(); | 108 controller._closeUnchecked(); |
| 109 }, | 109 }, onError: (error, stackTrace) { |
| 110 onError: (error, stackTrace) { | 110 controller._addError(error, stackTrace); |
| 111 controller._addError(error, stackTrace); | 111 controller._closeUnchecked(); |
| 112 controller._closeUnchecked(); | 112 }); |
| 113 }); | |
| 114 return controller.stream; | 113 return controller.stream; |
| 115 } | 114 } |
| 116 | 115 |
| 117 /** | 116 /** |
| 118 * Create a stream from a group of futures. | 117 * Create a stream from a group of futures. |
| 119 * | 118 * |
| 120 * The stream reports the results of the futures on the stream in the order | 119 * The stream reports the results of the futures on the stream in the order |
| 121 * in which the futures complete. | 120 * in which the futures complete. |
| 122 * | 121 * |
| 123 * If some futures have completed before calling `Stream.fromFutures`, | 122 * If some futures have completed before calling `Stream.fromFutures`, |
| (...skipping 50 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 174 /** | 173 /** |
| 175 * Creates a stream that repeatedly emits events at [period] intervals. | 174 * Creates a stream that repeatedly emits events at [period] intervals. |
| 176 * | 175 * |
| 177 * The event values are computed by invoking [computation]. The argument to | 176 * The event values are computed by invoking [computation]. The argument to |
| 178 * this callback is an integer that starts with 0 and is incremented for | 177 * this callback is an integer that starts with 0 and is incremented for |
| 179 * every event. | 178 * every event. |
| 180 * | 179 * |
| 181 * If [computation] is omitted the event values will all be `null`. | 180 * If [computation] is omitted the event values will all be `null`. |
| 182 */ | 181 */ |
| 183 factory Stream.periodic(Duration period, | 182 factory Stream.periodic(Duration period, |
| 184 [T computation(int computationCount)]) { | 183 [T computation(int computationCount)]) { |
| 185 Timer timer; | 184 Timer timer; |
| 186 int computationCount = 0; | 185 int computationCount = 0; |
| 187 StreamController<T> controller; | 186 StreamController<T> controller; |
| 188 // Counts the time that the Stream was running (and not paused). | 187 // Counts the time that the Stream was running (and not paused). |
| 189 Stopwatch watch = new Stopwatch(); | 188 Stopwatch watch = new Stopwatch(); |
| 190 | 189 |
| 191 void sendEvent() { | 190 void sendEvent() { |
| 192 watch.reset(); | 191 watch.reset(); |
| 193 T data; | 192 T data; |
| 194 if (computation != null) { | 193 if (computation != null) { |
| 195 try { | 194 try { |
| 196 data = computation(computationCount++); | 195 data = computation(computationCount++); |
| 197 } catch (e, s) { | 196 } catch (e, s) { |
| 198 controller.addError(e, s); | 197 controller.addError(e, s); |
| 199 return; | 198 return; |
| 200 } | 199 } |
| 201 } | 200 } |
| 202 controller.add(data); | 201 controller.add(data); |
| 203 } | 202 } |
| 204 | 203 |
| 205 void startPeriodicTimer() { | 204 void startPeriodicTimer() { |
| 206 assert(timer == null); | 205 assert(timer == null); |
| 207 timer = new Timer.periodic(period, (Timer timer) { | 206 timer = new Timer.periodic(period, (Timer timer) { |
| 208 sendEvent(); | 207 sendEvent(); |
| 209 }); | 208 }); |
| 210 } | 209 } |
| 211 | 210 |
| 212 controller = new StreamController<T>(sync: true, | 211 controller = new StreamController<T>( |
| 212 sync: true, |
| 213 onListen: () { | 213 onListen: () { |
| 214 watch.start(); | 214 watch.start(); |
| 215 startPeriodicTimer(); | 215 startPeriodicTimer(); |
| 216 }, | 216 }, |
| 217 onPause: () { | 217 onPause: () { |
| 218 timer.cancel(); | 218 timer.cancel(); |
| 219 timer = null; | 219 timer = null; |
| 220 watch.stop(); | 220 watch.stop(); |
| 221 }, | 221 }, |
| 222 onResume: () { | 222 onResume: () { |
| (...skipping 47 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 270 * // Some generic types ommitted for brevety. | 270 * // Some generic types ommitted for brevety. |
| 271 * Stream bind(Stream stream) => new Stream<String>.eventTransformed( | 271 * Stream bind(Stream stream) => new Stream<String>.eventTransformed( |
| 272 * stream, | 272 * stream, |
| 273 * (EventSink sink) => new DuplicationSink(sink)); | 273 * (EventSink sink) => new DuplicationSink(sink)); |
| 274 * } | 274 * } |
| 275 * | 275 * |
| 276 * stringStream.transform(new DuplicationTransformer()); | 276 * stringStream.transform(new DuplicationTransformer()); |
| 277 * | 277 * |
| 278 * The resulting stream is a broadcast stream if [source] is. | 278 * The resulting stream is a broadcast stream if [source] is. |
| 279 */ | 279 */ |
| 280 factory Stream.eventTransformed(Stream source, | 280 factory Stream.eventTransformed( |
| 281 EventSink mapSink(EventSink<T> sink)) { | 281 Stream source, EventSink mapSink(EventSink<T> sink)) { |
| 282 return new _BoundSinkStream(source, mapSink); | 282 return new _BoundSinkStream(source, mapSink); |
| 283 } | 283 } |
| 284 | 284 |
| 285 /** | 285 /** |
| 286 * Reports whether this stream is a broadcast stream. | 286 * Reports whether this stream is a broadcast stream. |
| 287 */ | 287 */ |
| 288 bool get isBroadcast => false; | 288 bool get isBroadcast => false; |
| 289 | 289 |
| 290 /** | 290 /** |
| 291 * Returns a multi-subscription stream that produces the same events as this. | 291 * Returns a multi-subscription stream that produces the same events as this. |
| 292 * | 292 * |
| 293 * The returned stream will subscribe to this stream when its first | 293 * The returned stream will subscribe to this stream when its first |
| 294 * subscriber is added, and will stay subscribed until this stream ends, | 294 * subscriber is added, and will stay subscribed until this stream ends, |
| 295 * or a callback cancels the subscription. | 295 * or a callback cancels the subscription. |
| 296 * | 296 * |
| 297 * If [onListen] is provided, it is called with a subscription-like object | 297 * If [onListen] is provided, it is called with a subscription-like object |
| 298 * that represents the underlying subscription to this stream. It is | 298 * that represents the underlying subscription to this stream. It is |
| 299 * possible to pause, resume or cancel the subscription during the call | 299 * possible to pause, resume or cancel the subscription during the call |
| 300 * to [onListen]. It is not possible to change the event handlers, including | 300 * to [onListen]. It is not possible to change the event handlers, including |
| 301 * using [StreamSubscription.asFuture]. | 301 * using [StreamSubscription.asFuture]. |
| 302 * | 302 * |
| 303 * If [onCancel] is provided, it is called in a similar way to [onListen] | 303 * If [onCancel] is provided, it is called in a similar way to [onListen] |
| 304 * when the returned stream stops having listener. If it later gets | 304 * when the returned stream stops having listener. If it later gets |
| 305 * a new listener, the [onListen] function is called again. | 305 * a new listener, the [onListen] function is called again. |
| 306 * | 306 * |
| 307 * Use the callbacks, for example, for pausing the underlying subscription | 307 * Use the callbacks, for example, for pausing the underlying subscription |
| 308 * while having no subscribers to prevent losing events, or canceling the | 308 * while having no subscribers to prevent losing events, or canceling the |
| 309 * subscription when there are no listeners. | 309 * subscription when there are no listeners. |
| 310 */ | 310 */ |
| 311 Stream<T> asBroadcastStream({ | 311 Stream<T> asBroadcastStream( |
| 312 void onListen(StreamSubscription<T> subscription), | 312 {void onListen(StreamSubscription<T> subscription), |
| 313 void onCancel(StreamSubscription<T> subscription) }) { | 313 void onCancel(StreamSubscription<T> subscription)}) { |
| 314 return new _AsBroadcastStream<T>(this, onListen, onCancel); | 314 return new _AsBroadcastStream<T>(this, onListen, onCancel); |
| 315 } | 315 } |
| 316 | 316 |
| 317 /** | 317 /** |
| 318 * Adds a subscription to this stream. | 318 * Adds a subscription to this stream. |
| 319 * | 319 * |
| 320 * Returns a [StreamSubscription] which handles events from the stream using | 320 * Returns a [StreamSubscription] which handles events from the stream using |
| 321 * the provided [onData], [onError] and [onDone] handlers. | 321 * the provided [onData], [onError] and [onDone] handlers. |
| 322 * The handlers can be changed on the subscription, but they start out | 322 * The handlers can be changed on the subscription, but they start out |
| 323 * as the provided functions. | 323 * as the provided functions. |
| (...skipping 19 matching lines...) Expand all Loading... |
| 343 * called. If [onDone] is `null`, nothing happens. | 343 * called. If [onDone] is `null`, nothing happens. |
| 344 * | 344 * |
| 345 * If [cancelOnError] is true, the subscription is automatically cancelled | 345 * If [cancelOnError] is true, the subscription is automatically cancelled |
| 346 * when the first error event is delivered. The default is `false`. | 346 * when the first error event is delivered. The default is `false`. |
| 347 * | 347 * |
| 348 * While a subscription is paused, or when it has been cancelled, | 348 * While a subscription is paused, or when it has been cancelled, |
| 349 * the subscription doesn't receive events and none of the | 349 * the subscription doesn't receive events and none of the |
| 350 * event handler functions are called. | 350 * event handler functions are called. |
| 351 */ | 351 */ |
| 352 StreamSubscription<T> listen(void onData(T event), | 352 StreamSubscription<T> listen(void onData(T event), |
| 353 { Function onError, | 353 {Function onError, void onDone(), bool cancelOnError}); |
| 354 void onDone(), | |
| 355 bool cancelOnError}); | |
| 356 | 354 |
| 357 /** | 355 /** |
| 358 * Creates a new stream from this stream that discards some data events. | 356 * Creates a new stream from this stream that discards some data events. |
| 359 * | 357 * |
| 360 * The new stream sends the same error and done events as this stream, | 358 * The new stream sends the same error and done events as this stream, |
| 361 * but it only sends the data events that satisfy the [test]. | 359 * but it only sends the data events that satisfy the [test]. |
| 362 * | 360 * |
| 363 * The returned stream is a broadcast stream if this stream is. | 361 * The returned stream is a broadcast stream if this stream is. |
| 364 * If a broadcast stream is listened to more than once, each subscription | 362 * If a broadcast stream is listened to more than once, each subscription |
| 365 * will individually perform the `test`. | 363 * will individually perform the `test`. |
| (...skipping 32 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 398 * | 396 * |
| 399 * The returned stream is a broadcast stream if this stream is. | 397 * The returned stream is a broadcast stream if this stream is. |
| 400 */ | 398 */ |
| 401 Stream<E> asyncMap<E>(convert(T event)) { | 399 Stream<E> asyncMap<E>(convert(T event)) { |
| 402 StreamController<E> controller; | 400 StreamController<E> controller; |
| 403 StreamSubscription<T> subscription; | 401 StreamSubscription<T> subscription; |
| 404 | 402 |
| 405 void onListen() { | 403 void onListen() { |
| 406 final add = controller.add; | 404 final add = controller.add; |
| 407 assert(controller is _StreamController || | 405 assert(controller is _StreamController || |
| 408 controller is _BroadcastStreamController); | 406 controller is _BroadcastStreamController); |
| 409 final _EventSink<E> eventSink = | 407 final _EventSink<E> eventSink = controller as Object/*=_EventSink<E>*/; |
| 410 controller as Object /*=_EventSink<E>*/; | |
| 411 final addError = eventSink._addError; | 408 final addError = eventSink._addError; |
| 412 subscription = this.listen( | 409 subscription = this.listen((T event) { |
| 413 (T event) { | 410 dynamic newValue; |
| 414 dynamic newValue; | 411 try { |
| 415 try { | 412 newValue = convert(event); |
| 416 newValue = convert(event); | 413 } catch (e, s) { |
| 417 } catch (e, s) { | 414 controller.addError(e, s); |
| 418 controller.addError(e, s); | 415 return; |
| 419 return; | 416 } |
| 420 } | 417 if (newValue is Future) { |
| 421 if (newValue is Future) { | 418 subscription.pause(); |
| 422 subscription.pause(); | 419 newValue |
| 423 newValue.then(add, onError: addError) | 420 .then(add, onError: addError) |
| 424 .whenComplete(subscription.resume); | 421 .whenComplete(subscription.resume); |
| 425 } else { | 422 } else { |
| 426 controller.add(newValue as Object/*=E*/); | 423 controller.add(newValue as Object/*=E*/); |
| 427 } | 424 } |
| 428 }, | 425 }, onError: addError, onDone: controller.close); |
| 429 onError: addError, | |
| 430 onDone: controller.close | |
| 431 ); | |
| 432 } | 426 } |
| 433 | 427 |
| 434 if (this.isBroadcast) { | 428 if (this.isBroadcast) { |
| 435 controller = new StreamController<E>.broadcast( | 429 controller = new StreamController<E>.broadcast( |
| 436 onListen: onListen, | 430 onListen: onListen, |
| 437 onCancel: () { subscription.cancel(); }, | 431 onCancel: () { |
| 438 sync: true | 432 subscription.cancel(); |
| 439 ); | 433 }, |
| 434 sync: true); |
| 440 } else { | 435 } else { |
| 441 controller = new StreamController<E>( | 436 controller = new StreamController<E>( |
| 442 onListen: onListen, | 437 onListen: onListen, |
| 443 onPause: () { subscription.pause(); }, | 438 onPause: () { |
| 444 onResume: () { subscription.resume(); }, | 439 subscription.pause(); |
| 445 onCancel: () => subscription.cancel(), | 440 }, |
| 446 sync: true | 441 onResume: () { |
| 447 ); | 442 subscription.resume(); |
| 443 }, |
| 444 onCancel: () => subscription.cancel(), |
| 445 sync: true); |
| 448 } | 446 } |
| 449 return controller.stream; | 447 return controller.stream; |
| 450 } | 448 } |
| 451 | 449 |
| 452 /** | 450 /** |
| 453 * Creates a new stream with the events of a stream per original event. | 451 * Creates a new stream with the events of a stream per original event. |
| 454 * | 452 * |
| 455 * This acts like [expand], except that [convert] returns a [Stream] | 453 * This acts like [expand], except that [convert] returns a [Stream] |
| 456 * instead of an [Iterable]. | 454 * instead of an [Iterable]. |
| 457 * The events of the returned stream becomes the events of the returned | 455 * The events of the returned stream becomes the events of the returned |
| 458 * stream, in the order they are produced. | 456 * stream, in the order they are produced. |
| 459 * | 457 * |
| 460 * If [convert] returns `null`, no value is put on the output stream, | 458 * If [convert] returns `null`, no value is put on the output stream, |
| 461 * just as if it returned an empty stream. | 459 * just as if it returned an empty stream. |
| 462 * | 460 * |
| 463 * The returned stream is a broadcast stream if this stream is. | 461 * The returned stream is a broadcast stream if this stream is. |
| 464 */ | 462 */ |
| 465 Stream<E> asyncExpand<E>(Stream<E> convert(T event)) { | 463 Stream<E> asyncExpand<E>(Stream<E> convert(T event)) { |
| 466 StreamController<E> controller; | 464 StreamController<E> controller; |
| 467 StreamSubscription<T> subscription; | 465 StreamSubscription<T> subscription; |
| 468 void onListen() { | 466 void onListen() { |
| 469 assert(controller is _StreamController || | 467 assert(controller is _StreamController || |
| 470 controller is _BroadcastStreamController); | 468 controller is _BroadcastStreamController); |
| 471 final _EventSink<E> eventSink = | 469 final _EventSink<E> eventSink = controller as Object/*=_EventSink<E>*/; |
| 472 controller as Object /*=_EventSink<E>*/; | 470 subscription = this.listen((T event) { |
| 473 subscription = this.listen( | 471 Stream<E> newStream; |
| 474 (T event) { | 472 try { |
| 475 Stream<E> newStream; | 473 newStream = convert(event); |
| 476 try { | 474 } catch (e, s) { |
| 477 newStream = convert(event); | 475 controller.addError(e, s); |
| 478 } catch (e, s) { | 476 return; |
| 479 controller.addError(e, s); | 477 } |
| 480 return; | 478 if (newStream != null) { |
| 481 } | 479 subscription.pause(); |
| 482 if (newStream != null) { | 480 controller.addStream(newStream).whenComplete(subscription.resume); |
| 483 subscription.pause(); | 481 } |
| 484 controller.addStream(newStream) | 482 }, |
| 485 .whenComplete(subscription.resume); | 483 onError: eventSink._addError, // Avoid Zone error replacement. |
| 486 } | 484 onDone: controller.close); |
| 487 }, | |
| 488 onError: eventSink._addError, // Avoid Zone error replacement. | |
| 489 onDone: controller.close | |
| 490 ); | |
| 491 } | 485 } |
| 486 |
| 492 if (this.isBroadcast) { | 487 if (this.isBroadcast) { |
| 493 controller = new StreamController<E>.broadcast( | 488 controller = new StreamController<E>.broadcast( |
| 494 onListen: onListen, | 489 onListen: onListen, |
| 495 onCancel: () { subscription.cancel(); }, | 490 onCancel: () { |
| 496 sync: true | 491 subscription.cancel(); |
| 497 ); | 492 }, |
| 493 sync: true); |
| 498 } else { | 494 } else { |
| 499 controller = new StreamController<E>( | 495 controller = new StreamController<E>( |
| 500 onListen: onListen, | 496 onListen: onListen, |
| 501 onPause: () { subscription.pause(); }, | 497 onPause: () { |
| 502 onResume: () { subscription.resume(); }, | 498 subscription.pause(); |
| 503 onCancel: () => subscription.cancel(), | 499 }, |
| 504 sync: true | 500 onResume: () { |
| 505 ); | 501 subscription.resume(); |
| 502 }, |
| 503 onCancel: () => subscription.cancel(), |
| 504 sync: true); |
| 506 } | 505 } |
| 507 return controller.stream; | 506 return controller.stream; |
| 508 } | 507 } |
| 509 | 508 |
| 510 /** | 509 /** |
| 511 * Creates a wrapper Stream that intercepts some errors from this stream. | 510 * Creates a wrapper Stream that intercepts some errors from this stream. |
| 512 * | 511 * |
| 513 * If this stream sends an error that matches [test], then it is intercepted | 512 * If this stream sends an error that matches [test], then it is intercepted |
| 514 * by the [onError] function. | 513 * by the [onError] function. |
| 515 * | 514 * |
| (...skipping 12 matching lines...) Expand all Loading... |
| 528 * or simply return to make the stream forget the error. | 527 * or simply return to make the stream forget the error. |
| 529 * | 528 * |
| 530 * If you need to transform an error into a data event, use the more generic | 529 * If you need to transform an error into a data event, use the more generic |
| 531 * [Stream.transform] to handle the event by writing a data event to | 530 * [Stream.transform] to handle the event by writing a data event to |
| 532 * the output sink. | 531 * the output sink. |
| 533 * | 532 * |
| 534 * The returned stream is a broadcast stream if this stream is. | 533 * The returned stream is a broadcast stream if this stream is. |
| 535 * If a broadcast stream is listened to more than once, each subscription | 534 * If a broadcast stream is listened to more than once, each subscription |
| 536 * will individually perform the `test` and handle the error. | 535 * will individually perform the `test` and handle the error. |
| 537 */ | 536 */ |
| 538 Stream<T> handleError(Function onError, { bool test(error) }) { | 537 Stream<T> handleError(Function onError, {bool test(error)}) { |
| 539 return new _HandleErrorStream<T>(this, onError, test); | 538 return new _HandleErrorStream<T>(this, onError, test); |
| 540 } | 539 } |
| 541 | 540 |
| 542 /** | 541 /** |
| 543 * Creates a new stream from this stream that converts each element | 542 * Creates a new stream from this stream that converts each element |
| 544 * into zero or more events. | 543 * into zero or more events. |
| 545 * | 544 * |
| 546 * Each incoming event is converted to an [Iterable] of new events, | 545 * Each incoming event is converted to an [Iterable] of new events, |
| 547 * and each of these new events are then sent by the returned stream | 546 * and each of these new events are then sent by the returned stream |
| 548 * in order. | 547 * in order. |
| (...skipping 29 matching lines...) Expand all Loading... |
| 578 } | 577 } |
| 579 | 578 |
| 580 /** | 579 /** |
| 581 * Chains this stream as the input of the provided [StreamTransformer]. | 580 * Chains this stream as the input of the provided [StreamTransformer]. |
| 582 * | 581 * |
| 583 * Returns the result of [:streamTransformer.bind:] itself. | 582 * Returns the result of [:streamTransformer.bind:] itself. |
| 584 * | 583 * |
| 585 * The `streamTransformer` can decide whether it wants to return a | 584 * The `streamTransformer` can decide whether it wants to return a |
| 586 * broadcast stream or not. | 585 * broadcast stream or not. |
| 587 */ | 586 */ |
| 588 Stream<S> transform<S>( | 587 Stream<S> transform<S>(StreamTransformer<T, S> streamTransformer) { |
| 589 StreamTransformer<T, S > streamTransformer) { | |
| 590 return streamTransformer.bind(this); | 588 return streamTransformer.bind(this); |
| 591 } | 589 } |
| 592 | 590 |
| 593 /** | 591 /** |
| 594 * Reduces a sequence of values by repeatedly applying [combine]. | 592 * Reduces a sequence of values by repeatedly applying [combine]. |
| 595 */ | 593 */ |
| 596 Future<T> reduce(T combine(T previous, T element)) { | 594 Future<T> reduce(T combine(T previous, T element)) { |
| 597 _Future<T> result = new _Future<T>(); | 595 _Future<T> result = new _Future<T>(); |
| 598 bool seenFirst = false; | 596 bool seenFirst = false; |
| 599 T value; | 597 T value; |
| 600 StreamSubscription subscription; | 598 StreamSubscription subscription; |
| 601 subscription = this.listen( | 599 subscription = this.listen( |
| 602 (T element) { | 600 (T element) { |
| 603 if (seenFirst) { | 601 if (seenFirst) { |
| 604 _runUserCode(() => combine(value, element), | 602 _runUserCode(() => combine(value, element), (T newValue) { |
| 605 (T newValue) { value = newValue; }, | 603 value = newValue; |
| 606 _cancelAndErrorClosure(subscription, result)); | 604 }, _cancelAndErrorClosure(subscription, result)); |
| 607 } else { | 605 } else { |
| 608 value = element; | 606 value = element; |
| 609 seenFirst = true; | 607 seenFirst = true; |
| 610 } | |
| 611 }, | |
| 612 onError: result._completeError, | |
| 613 onDone: () { | |
| 614 if (!seenFirst) { | |
| 615 try { | |
| 616 throw IterableElementError.noElement(); | |
| 617 } catch (e, s) { | |
| 618 _completeWithErrorCallback(result, e, s); | |
| 619 } | 608 } |
| 620 } else { | 609 }, |
| 621 result._complete(value); | 610 onError: result._completeError, |
| 622 } | 611 onDone: () { |
| 623 }, | 612 if (!seenFirst) { |
| 624 cancelOnError: true | 613 try { |
| 625 ); | 614 throw IterableElementError.noElement(); |
| 615 } catch (e, s) { |
| 616 _completeWithErrorCallback(result, e, s); |
| 617 } |
| 618 } else { |
| 619 result._complete(value); |
| 620 } |
| 621 }, |
| 622 cancelOnError: true); |
| 626 return result; | 623 return result; |
| 627 } | 624 } |
| 628 | 625 |
| 629 /** Reduces a sequence of values by repeatedly applying [combine]. */ | 626 /** Reduces a sequence of values by repeatedly applying [combine]. */ |
| 630 Future<S> fold<S>(S initialValue, | 627 Future<S> fold<S>(S initialValue, S combine(S previous, T element)) { |
| 631 S combine(S previous, T element)) { | |
| 632 | |
| 633 _Future<S> result = new _Future<S>(); | 628 _Future<S> result = new _Future<S>(); |
| 634 S value = initialValue; | 629 S value = initialValue; |
| 635 StreamSubscription subscription; | 630 StreamSubscription subscription; |
| 636 subscription = this.listen( | 631 subscription = this.listen((T element) { |
| 637 (T element) { | 632 _runUserCode(() => combine(value, element), (S newValue) { |
| 638 _runUserCode( | 633 value = newValue; |
| 639 () => combine(value, element), | 634 }, _cancelAndErrorClosure(subscription, result)); |
| 640 (S newValue) { value = newValue; }, | 635 }, onError: (e, st) { |
| 641 _cancelAndErrorClosure(subscription, result) | 636 result._completeError(e, st); |
| 642 ); | 637 }, onDone: () { |
| 643 }, | 638 result._complete(value); |
| 644 onError: (e, st) { | 639 }, cancelOnError: true); |
| 645 result._completeError(e, st); | |
| 646 }, | |
| 647 onDone: () { | |
| 648 result._complete(value); | |
| 649 }, | |
| 650 cancelOnError: true); | |
| 651 return result; | 640 return result; |
| 652 } | 641 } |
| 653 | 642 |
| 654 /** | 643 /** |
| 655 * Collects string of data events' string representations. | 644 * Collects string of data events' string representations. |
| 656 * | 645 * |
| 657 * If [separator] is provided, it is inserted between any two | 646 * If [separator] is provided, it is inserted between any two |
| 658 * elements. | 647 * elements. |
| 659 * | 648 * |
| 660 * Any error in the stream causes the future to complete with that | 649 * Any error in the stream causes the future to complete with that |
| 661 * error. Otherwise it completes with the collected string when | 650 * error. Otherwise it completes with the collected string when |
| 662 * the "done" event arrives. | 651 * the "done" event arrives. |
| 663 */ | 652 */ |
| 664 Future<String> join([String separator = ""]) { | 653 Future<String> join([String separator = ""]) { |
| 665 _Future<String> result = new _Future<String>(); | 654 _Future<String> result = new _Future<String>(); |
| 666 StringBuffer buffer = new StringBuffer(); | 655 StringBuffer buffer = new StringBuffer(); |
| 667 StreamSubscription subscription; | 656 StreamSubscription subscription; |
| 668 bool first = true; | 657 bool first = true; |
| 669 subscription = this.listen( | 658 subscription = this.listen((T element) { |
| 670 (T element) { | 659 if (!first) { |
| 671 if (!first) { | 660 buffer.write(separator); |
| 672 buffer.write(separator); | 661 } |
| 673 } | 662 first = false; |
| 674 first = false; | 663 try { |
| 675 try { | 664 buffer.write(element); |
| 676 buffer.write(element); | 665 } catch (e, s) { |
| 677 } catch (e, s) { | 666 _cancelAndErrorWithReplacement(subscription, result, e, s); |
| 678 _cancelAndErrorWithReplacement(subscription, result, e, s); | 667 } |
| 679 } | 668 }, onError: (e) { |
| 680 }, | 669 result._completeError(e); |
| 681 onError: (e) { | 670 }, onDone: () { |
| 682 result._completeError(e); | 671 result._complete(buffer.toString()); |
| 683 }, | 672 }, cancelOnError: true); |
| 684 onDone: () { | |
| 685 result._complete(buffer.toString()); | |
| 686 }, | |
| 687 cancelOnError: true); | |
| 688 return result; | 673 return result; |
| 689 } | 674 } |
| 690 | 675 |
| 691 /** | 676 /** |
| 692 * Checks whether [needle] occurs in the elements provided by this stream. | 677 * Checks whether [needle] occurs in the elements provided by this stream. |
| 693 * | 678 * |
| 694 * Completes the [Future] when the answer is known. | 679 * Completes the [Future] when the answer is known. |
| 695 * If this stream reports an error, the [Future] will report that error. | 680 * If this stream reports an error, the [Future] will report that error. |
| 696 */ | 681 */ |
| 697 Future<bool> contains(Object needle) { | 682 Future<bool> contains(Object needle) { |
| 698 _Future<bool> future = new _Future<bool>(); | 683 _Future<bool> future = new _Future<bool>(); |
| 699 StreamSubscription subscription; | 684 StreamSubscription subscription; |
| 700 subscription = this.listen( | 685 subscription = this.listen( |
| 701 (T element) { | 686 (T element) { |
| 702 _runUserCode( | 687 _runUserCode(() => (element == needle), (bool isMatch) { |
| 703 () => (element == needle), | 688 if (isMatch) { |
| 704 (bool isMatch) { | 689 _cancelAndValue(subscription, future, true); |
| 705 if (isMatch) { | 690 } |
| 706 _cancelAndValue(subscription, future, true); | 691 }, _cancelAndErrorClosure(subscription, future)); |
| 707 } | |
| 708 }, | |
| 709 _cancelAndErrorClosure(subscription, future) | |
| 710 ); | |
| 711 }, | 692 }, |
| 712 onError: future._completeError, | 693 onError: future._completeError, |
| 713 onDone: () { | 694 onDone: () { |
| 714 future._complete(false); | 695 future._complete(false); |
| 715 }, | 696 }, |
| 716 cancelOnError: true); | 697 cancelOnError: true); |
| 717 return future; | 698 return future; |
| 718 } | 699 } |
| 719 | 700 |
| 720 /** | 701 /** |
| 721 * Executes [action] on each data event of the stream. | 702 * Executes [action] on each data event of the stream. |
| 722 * | 703 * |
| 723 * Completes the returned [Future] when all events of the stream | 704 * Completes the returned [Future] when all events of the stream |
| 724 * have been processed. Completes the future with an error if the | 705 * have been processed. Completes the future with an error if the |
| 725 * stream has an error event, or if [action] throws. | 706 * stream has an error event, or if [action] throws. |
| 726 */ | 707 */ |
| 727 Future forEach(void action(T element)) { | 708 Future forEach(void action(T element)) { |
| 728 _Future future = new _Future(); | 709 _Future future = new _Future(); |
| 729 StreamSubscription subscription; | 710 StreamSubscription subscription; |
| 730 subscription = this.listen( | 711 subscription = this.listen( |
| 731 (T element) { | 712 (T element) { |
| 732 _runUserCode( | 713 _runUserCode(() => action(element), (_) {}, |
| 733 () => action(element), | 714 _cancelAndErrorClosure(subscription, future)); |
| 734 (_) {}, | |
| 735 _cancelAndErrorClosure(subscription, future) | |
| 736 ); | |
| 737 }, | 715 }, |
| 738 onError: future._completeError, | 716 onError: future._completeError, |
| 739 onDone: () { | 717 onDone: () { |
| 740 future._complete(null); | 718 future._complete(null); |
| 741 }, | 719 }, |
| 742 cancelOnError: true); | 720 cancelOnError: true); |
| 743 return future; | 721 return future; |
| 744 } | 722 } |
| 745 | 723 |
| 746 /** | 724 /** |
| 747 * Checks whether [test] accepts all elements provided by this stream. | 725 * Checks whether [test] accepts all elements provided by this stream. |
| 748 * | 726 * |
| 749 * Completes the [Future] when the answer is known. | 727 * Completes the [Future] when the answer is known. |
| 750 * If this stream reports an error, the [Future] will report that error. | 728 * If this stream reports an error, the [Future] will report that error. |
| 751 */ | 729 */ |
| 752 Future<bool> every(bool test(T element)) { | 730 Future<bool> every(bool test(T element)) { |
| 753 _Future<bool> future = new _Future<bool>(); | 731 _Future<bool> future = new _Future<bool>(); |
| 754 StreamSubscription subscription; | 732 StreamSubscription subscription; |
| 755 subscription = this.listen( | 733 subscription = this.listen( |
| 756 (T element) { | 734 (T element) { |
| 757 _runUserCode( | 735 _runUserCode(() => test(element), (bool isMatch) { |
| 758 () => test(element), | 736 if (!isMatch) { |
| 759 (bool isMatch) { | 737 _cancelAndValue(subscription, future, false); |
| 760 if (!isMatch) { | 738 } |
| 761 _cancelAndValue(subscription, future, false); | 739 }, _cancelAndErrorClosure(subscription, future)); |
| 762 } | |
| 763 }, | |
| 764 _cancelAndErrorClosure(subscription, future) | |
| 765 ); | |
| 766 }, | 740 }, |
| 767 onError: future._completeError, | 741 onError: future._completeError, |
| 768 onDone: () { | 742 onDone: () { |
| 769 future._complete(true); | 743 future._complete(true); |
| 770 }, | 744 }, |
| 771 cancelOnError: true); | 745 cancelOnError: true); |
| 772 return future; | 746 return future; |
| 773 } | 747 } |
| 774 | 748 |
| 775 /** | 749 /** |
| 776 * Checks whether [test] accepts any element provided by this stream. | 750 * Checks whether [test] accepts any element provided by this stream. |
| 777 * | 751 * |
| 778 * Completes the [Future] when the answer is known. | 752 * Completes the [Future] when the answer is known. |
| 779 * | 753 * |
| 780 * If this stream reports an error, the [Future] reports that error. | 754 * If this stream reports an error, the [Future] reports that error. |
| 781 * | 755 * |
| 782 * Stops listening to the stream after the first matching element has been | 756 * Stops listening to the stream after the first matching element has been |
| 783 * found. | 757 * found. |
| 784 * | 758 * |
| 785 * Internally the method cancels its subscription after this element. This | 759 * Internally the method cancels its subscription after this element. This |
| 786 * means that single-subscription (non-broadcast) streams are closed and | 760 * means that single-subscription (non-broadcast) streams are closed and |
| 787 * cannot be reused after a call to this method. | 761 * cannot be reused after a call to this method. |
| 788 */ | 762 */ |
| 789 Future<bool> any(bool test(T element)) { | 763 Future<bool> any(bool test(T element)) { |
| 790 _Future<bool> future = new _Future<bool>(); | 764 _Future<bool> future = new _Future<bool>(); |
| 791 StreamSubscription subscription; | 765 StreamSubscription subscription; |
| 792 subscription = this.listen( | 766 subscription = this.listen( |
| 793 (T element) { | 767 (T element) { |
| 794 _runUserCode( | 768 _runUserCode(() => test(element), (bool isMatch) { |
| 795 () => test(element), | 769 if (isMatch) { |
| 796 (bool isMatch) { | 770 _cancelAndValue(subscription, future, true); |
| 797 if (isMatch) { | 771 } |
| 798 _cancelAndValue(subscription, future, true); | 772 }, _cancelAndErrorClosure(subscription, future)); |
| 799 } | |
| 800 }, | |
| 801 _cancelAndErrorClosure(subscription, future) | |
| 802 ); | |
| 803 }, | 773 }, |
| 804 onError: future._completeError, | 774 onError: future._completeError, |
| 805 onDone: () { | 775 onDone: () { |
| 806 future._complete(false); | 776 future._complete(false); |
| 807 }, | 777 }, |
| 808 cancelOnError: true); | 778 cancelOnError: true); |
| 809 return future; | 779 return future; |
| 810 } | 780 } |
| 811 | 781 |
| 812 | |
| 813 /** Counts the elements in the stream. */ | 782 /** Counts the elements in the stream. */ |
| 814 Future<int> get length { | 783 Future<int> get length { |
| 815 _Future<int> future = new _Future<int>(); | 784 _Future<int> future = new _Future<int>(); |
| 816 int count = 0; | 785 int count = 0; |
| 817 this.listen( | 786 this.listen( |
| 818 (_) { count++; }, | 787 (_) { |
| 819 onError: future._completeError, | 788 count++; |
| 820 onDone: () { | 789 }, |
| 821 future._complete(count); | 790 onError: future._completeError, |
| 822 }, | 791 onDone: () { |
| 823 cancelOnError: true); | 792 future._complete(count); |
| 793 }, |
| 794 cancelOnError: true); |
| 824 return future; | 795 return future; |
| 825 } | 796 } |
| 826 | 797 |
| 827 /** | 798 /** |
| 828 * Reports whether this stream contains any elements. | 799 * Reports whether this stream contains any elements. |
| 829 * | 800 * |
| 830 * Stops listening to the stream after the first element has been received. | 801 * Stops listening to the stream after the first element has been received. |
| 831 * | 802 * |
| 832 * Internally the method cancels its subscription after the first element. | 803 * Internally the method cancels its subscription after the first element. |
| 833 * This means that single-subscription (non-broadcast) streams are closed and | 804 * This means that single-subscription (non-broadcast) streams are closed and |
| 834 * cannot be reused after a call to this getter. | 805 * cannot be reused after a call to this getter. |
| 835 */ | 806 */ |
| 836 Future<bool> get isEmpty { | 807 Future<bool> get isEmpty { |
| 837 _Future<bool> future = new _Future<bool>(); | 808 _Future<bool> future = new _Future<bool>(); |
| 838 StreamSubscription subscription; | 809 StreamSubscription subscription; |
| 839 subscription = this.listen( | 810 subscription = this.listen( |
| 840 (_) { | 811 (_) { |
| 841 _cancelAndValue(subscription, future, false); | 812 _cancelAndValue(subscription, future, false); |
| 842 }, | 813 }, |
| 843 onError: future._completeError, | 814 onError: future._completeError, |
| 844 onDone: () { | 815 onDone: () { |
| 845 future._complete(true); | 816 future._complete(true); |
| 846 }, | 817 }, |
| 847 cancelOnError: true); | 818 cancelOnError: true); |
| 848 return future; | 819 return future; |
| 849 } | 820 } |
| 850 | 821 |
| 851 /** Collects the data of this stream in a [List]. */ | 822 /** Collects the data of this stream in a [List]. */ |
| 852 Future<List<T>> toList() { | 823 Future<List<T>> toList() { |
| 853 List<T> result = <T>[]; | 824 List<T> result = <T>[]; |
| 854 _Future<List<T>> future = new _Future<List<T>>(); | 825 _Future<List<T>> future = new _Future<List<T>>(); |
| 855 this.listen( | 826 this.listen( |
| 856 (T data) { | 827 (T data) { |
| 857 result.add(data); | 828 result.add(data); |
| 858 }, | 829 }, |
| 859 onError: future._completeError, | 830 onError: future._completeError, |
| 860 onDone: () { | 831 onDone: () { |
| 861 future._complete(result); | 832 future._complete(result); |
| 862 }, | 833 }, |
| 863 cancelOnError: true); | 834 cancelOnError: true); |
| 864 return future; | 835 return future; |
| 865 } | 836 } |
| 866 | 837 |
| 867 /** | 838 /** |
| 868 * Collects the data of this stream in a [Set]. | 839 * Collects the data of this stream in a [Set]. |
| 869 * | 840 * |
| 870 * The returned set is the same type as returned by `new Set<T>()`. | 841 * The returned set is the same type as returned by `new Set<T>()`. |
| 871 * If another type of set is needed, either use [forEach] to add each | 842 * If another type of set is needed, either use [forEach] to add each |
| 872 * element to the set, or use | 843 * element to the set, or use |
| 873 * `toList().then((list) => new SomeOtherSet.from(list))` | 844 * `toList().then((list) => new SomeOtherSet.from(list))` |
| 874 * to create the set. | 845 * to create the set. |
| 875 */ | 846 */ |
| 876 Future<Set<T>> toSet() { | 847 Future<Set<T>> toSet() { |
| 877 Set<T> result = new Set<T>(); | 848 Set<T> result = new Set<T>(); |
| 878 _Future<Set<T>> future = new _Future<Set<T>>(); | 849 _Future<Set<T>> future = new _Future<Set<T>>(); |
| 879 this.listen( | 850 this.listen( |
| 880 (T data) { | 851 (T data) { |
| 881 result.add(data); | 852 result.add(data); |
| 882 }, | 853 }, |
| 883 onError: future._completeError, | 854 onError: future._completeError, |
| 884 onDone: () { | 855 onDone: () { |
| 885 future._complete(result); | 856 future._complete(result); |
| 886 }, | 857 }, |
| 887 cancelOnError: true); | 858 cancelOnError: true); |
| 888 return future; | 859 return future; |
| 889 } | 860 } |
| 890 | 861 |
| 891 /** | 862 /** |
| 892 * Discards all data on the stream, but signals when it's done or an error | 863 * Discards all data on the stream, but signals when it's done or an error |
| 893 * occurred. | 864 * occurred. |
| 894 * | 865 * |
| 895 * When subscribing using [drain], cancelOnError will be true. This means | 866 * When subscribing using [drain], cancelOnError will be true. This means |
| 896 * that the future will complete with the first error on the stream and then | 867 * that the future will complete with the first error on the stream and then |
| 897 * cancel the subscription. | 868 * cancel the subscription. |
| 898 * | 869 * |
| 899 * In case of a `done` event the future completes with the given | 870 * In case of a `done` event the future completes with the given |
| 900 * [futureValue]. | 871 * [futureValue]. |
| 901 */ | 872 */ |
| 902 Future<E> drain<E>([E futureValue]) | 873 Future<E> drain<E>([E futureValue]) => listen(null, cancelOnError: true) |
| 903 => listen(null, cancelOnError: true).asFuture<E>(futureValue); | 874 .asFuture<E>(futureValue); |
| 904 | 875 |
| 905 /** | 876 /** |
| 906 * Provides at most the first [count] data events of this stream. | 877 * Provides at most the first [count] data events of this stream. |
| 907 * | 878 * |
| 908 * Forwards all events of this stream to the returned stream | 879 * Forwards all events of this stream to the returned stream |
| 909 * until [count] data events have been forwarded or this stream ends, | 880 * until [count] data events have been forwarded or this stream ends, |
| 910 * then ends the returned stream with a done event. | 881 * then ends the returned stream with a done event. |
| 911 * | 882 * |
| 912 * If this stream produces fewer than [count] data events before it's done, | 883 * If this stream produces fewer than [count] data events before it's done, |
| 913 * so will the returned stream. | 884 * so will the returned stream. |
| (...skipping 93 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 1007 * If this stream is empty (a done event occurs before the first data event), | 978 * If this stream is empty (a done event occurs before the first data event), |
| 1008 * the resulting future completes with a [StateError]. | 979 * the resulting future completes with a [StateError]. |
| 1009 * | 980 * |
| 1010 * Except for the type of the error, this method is equivalent to | 981 * Except for the type of the error, this method is equivalent to |
| 1011 * [:this.elementAt(0):]. | 982 * [:this.elementAt(0):]. |
| 1012 */ | 983 */ |
| 1013 Future<T> get first { | 984 Future<T> get first { |
| 1014 _Future<T> future = new _Future<T>(); | 985 _Future<T> future = new _Future<T>(); |
| 1015 StreamSubscription subscription; | 986 StreamSubscription subscription; |
| 1016 subscription = this.listen( | 987 subscription = this.listen( |
| 1017 (T value) { | 988 (T value) { |
| 1018 _cancelAndValue(subscription, future, value); | 989 _cancelAndValue(subscription, future, value); |
| 1019 }, | 990 }, |
| 1020 onError: future._completeError, | 991 onError: future._completeError, |
| 1021 onDone: () { | 992 onDone: () { |
| 1022 try { | 993 try { |
| 1023 throw IterableElementError.noElement(); | 994 throw IterableElementError.noElement(); |
| 1024 } catch (e, s) { | 995 } catch (e, s) { |
| 1025 _completeWithErrorCallback(future, e, s); | 996 _completeWithErrorCallback(future, e, s); |
| 1026 } | 997 } |
| 1027 }, | 998 }, |
| 1028 cancelOnError: true); | 999 cancelOnError: true); |
| 1029 return future; | 1000 return future; |
| 1030 } | 1001 } |
| 1031 | 1002 |
| 1032 /** | 1003 /** |
| 1033 * Returns the last element of the stream. | 1004 * Returns the last element of the stream. |
| 1034 * | 1005 * |
| 1035 * If an error event occurs before the first data event, the resulting future | 1006 * If an error event occurs before the first data event, the resulting future |
| 1036 * is completed with that error. | 1007 * is completed with that error. |
| 1037 * | 1008 * |
| 1038 * If this stream is empty (a done event occurs before the first data event), | 1009 * If this stream is empty (a done event occurs before the first data event), |
| 1039 * the resulting future completes with a [StateError]. | 1010 * the resulting future completes with a [StateError]. |
| 1040 */ | 1011 */ |
| 1041 Future<T> get last { | 1012 Future<T> get last { |
| 1042 _Future<T> future = new _Future<T>(); | 1013 _Future<T> future = new _Future<T>(); |
| 1043 T result = null; | 1014 T result = null; |
| 1044 bool foundResult = false; | 1015 bool foundResult = false; |
| 1045 listen( | 1016 listen( |
| 1046 (T value) { | 1017 (T value) { |
| 1047 foundResult = true; | 1018 foundResult = true; |
| 1048 result = value; | 1019 result = value; |
| 1049 }, | 1020 }, |
| 1050 onError: future._completeError, | 1021 onError: future._completeError, |
| 1051 onDone: () { | 1022 onDone: () { |
| 1052 if (foundResult) { | 1023 if (foundResult) { |
| 1053 future._complete(result); | 1024 future._complete(result); |
| 1054 return; | 1025 return; |
| 1055 } | 1026 } |
| 1056 try { | 1027 try { |
| 1057 throw IterableElementError.noElement(); | 1028 throw IterableElementError.noElement(); |
| 1058 } catch (e, s) { | 1029 } catch (e, s) { |
| 1059 _completeWithErrorCallback(future, e, s); | 1030 _completeWithErrorCallback(future, e, s); |
| 1060 } | 1031 } |
| 1061 }, | 1032 }, |
| 1062 cancelOnError: true); | 1033 cancelOnError: true); |
| 1063 return future; | 1034 return future; |
| 1064 } | 1035 } |
| 1065 | 1036 |
| 1066 /** | 1037 /** |
| 1067 * Returns the single element. | 1038 * Returns the single element. |
| 1068 * | 1039 * |
| 1069 * If an error event occurs before or after the first data event, the | 1040 * If an error event occurs before or after the first data event, the |
| 1070 * resulting future is completed with that error. | 1041 * resulting future is completed with that error. |
| 1071 * | 1042 * |
| 1072 * If [this] is empty or has more than one element throws a [StateError]. | 1043 * If [this] is empty or has more than one element throws a [StateError]. |
| 1073 */ | 1044 */ |
| 1074 Future<T> get single { | 1045 Future<T> get single { |
| 1075 _Future<T> future = new _Future<T>(); | 1046 _Future<T> future = new _Future<T>(); |
| 1076 T result = null; | 1047 T result = null; |
| 1077 bool foundResult = false; | 1048 bool foundResult = false; |
| 1078 StreamSubscription subscription; | 1049 StreamSubscription subscription; |
| 1079 subscription = this.listen( | 1050 subscription = this.listen( |
| 1080 (T value) { | 1051 (T value) { |
| 1081 if (foundResult) { | 1052 if (foundResult) { |
| 1082 // This is the second element we get. | 1053 // This is the second element we get. |
| 1054 try { |
| 1055 throw IterableElementError.tooMany(); |
| 1056 } catch (e, s) { |
| 1057 _cancelAndErrorWithReplacement(subscription, future, e, s); |
| 1058 } |
| 1059 return; |
| 1060 } |
| 1061 foundResult = true; |
| 1062 result = value; |
| 1063 }, |
| 1064 onError: future._completeError, |
| 1065 onDone: () { |
| 1066 if (foundResult) { |
| 1067 future._complete(result); |
| 1068 return; |
| 1069 } |
| 1083 try { | 1070 try { |
| 1084 throw IterableElementError.tooMany(); | 1071 throw IterableElementError.noElement(); |
| 1085 } catch (e, s) { | 1072 } catch (e, s) { |
| 1086 _cancelAndErrorWithReplacement(subscription, future, e, s); | 1073 _completeWithErrorCallback(future, e, s); |
| 1087 } | 1074 } |
| 1088 return; | 1075 }, |
| 1089 } | 1076 cancelOnError: true); |
| 1090 foundResult = true; | |
| 1091 result = value; | |
| 1092 }, | |
| 1093 onError: future._completeError, | |
| 1094 onDone: () { | |
| 1095 if (foundResult) { | |
| 1096 future._complete(result); | |
| 1097 return; | |
| 1098 } | |
| 1099 try { | |
| 1100 throw IterableElementError.noElement(); | |
| 1101 } catch (e, s) { | |
| 1102 _completeWithErrorCallback(future, e, s); | |
| 1103 } | |
| 1104 }, | |
| 1105 cancelOnError: true); | |
| 1106 return future; | 1077 return future; |
| 1107 } | 1078 } |
| 1108 | 1079 |
| 1109 /** | 1080 /** |
| 1110 * Finds the first element of this stream matching [test]. | 1081 * Finds the first element of this stream matching [test]. |
| 1111 * | 1082 * |
| 1112 * Returns a future that is filled with the first element of this stream | 1083 * Returns a future that is filled with the first element of this stream |
| 1113 * that [test] returns true for. | 1084 * that [test] returns true for. |
| 1114 * | 1085 * |
| 1115 * If no such element is found before this stream is done, and a | 1086 * If no such element is found before this stream is done, and a |
| 1116 * [defaultValue] function is provided, the result of calling [defaultValue] | 1087 * [defaultValue] function is provided, the result of calling [defaultValue] |
| 1117 * becomes the value of the future. | 1088 * becomes the value of the future. |
| 1118 * | 1089 * |
| 1119 * Stops listening to the stream after the first matching element has been | 1090 * Stops listening to the stream after the first matching element has been |
| 1120 * received. | 1091 * received. |
| 1121 * | 1092 * |
| 1122 * Internally the method cancels its subscription after the first element that | 1093 * Internally the method cancels its subscription after the first element that |
| 1123 * matches the predicate. This means that single-subscription (non-broadcast) | 1094 * matches the predicate. This means that single-subscription (non-broadcast) |
| 1124 * streams are closed and cannot be reused after a call to this method. | 1095 * streams are closed and cannot be reused after a call to this method. |
| 1125 * | 1096 * |
| 1126 * If an error occurs, or if this stream ends without finding a match and | 1097 * If an error occurs, or if this stream ends without finding a match and |
| 1127 * with no [defaultValue] function provided, the future will receive an | 1098 * with no [defaultValue] function provided, the future will receive an |
| 1128 * error. | 1099 * error. |
| 1129 */ | 1100 */ |
| 1130 Future<dynamic> firstWhere(bool test(T element), {Object defaultValue()}) { | 1101 Future<dynamic> firstWhere(bool test(T element), {Object defaultValue()}) { |
| 1131 _Future<dynamic> future = new _Future(); | 1102 _Future<dynamic> future = new _Future(); |
| 1132 StreamSubscription subscription; | 1103 StreamSubscription subscription; |
| 1133 subscription = this.listen( | 1104 subscription = this.listen( |
| 1134 (T value) { | 1105 (T value) { |
| 1135 _runUserCode( | 1106 _runUserCode(() => test(value), (bool isMatch) { |
| 1136 () => test(value), | |
| 1137 (bool isMatch) { | |
| 1138 if (isMatch) { | 1107 if (isMatch) { |
| 1139 _cancelAndValue(subscription, future, value); | 1108 _cancelAndValue(subscription, future, value); |
| 1140 } | 1109 } |
| 1141 }, | 1110 }, _cancelAndErrorClosure(subscription, future)); |
| 1142 _cancelAndErrorClosure(subscription, future) | 1111 }, |
| 1143 ); | 1112 onError: future._completeError, |
| 1144 }, | 1113 onDone: () { |
| 1145 onError: future._completeError, | 1114 if (defaultValue != null) { |
| 1146 onDone: () { | 1115 _runUserCode(defaultValue, future._complete, future._completeError); |
| 1147 if (defaultValue != null) { | 1116 return; |
| 1148 _runUserCode(defaultValue, future._complete, future._completeError); | 1117 } |
| 1149 return; | 1118 try { |
| 1150 } | 1119 throw IterableElementError.noElement(); |
| 1151 try { | 1120 } catch (e, s) { |
| 1152 throw IterableElementError.noElement(); | 1121 _completeWithErrorCallback(future, e, s); |
| 1153 } catch (e, s) { | 1122 } |
| 1154 _completeWithErrorCallback(future, e, s); | 1123 }, |
| 1155 } | 1124 cancelOnError: true); |
| 1156 }, | |
| 1157 cancelOnError: true); | |
| 1158 return future; | 1125 return future; |
| 1159 } | 1126 } |
| 1160 | 1127 |
| 1161 /** | 1128 /** |
| 1162 * Finds the last element in this stream matching [test]. | 1129 * Finds the last element in this stream matching [test]. |
| 1163 * | 1130 * |
| 1164 * As [firstWhere], except that the last matching element is found. | 1131 * As [firstWhere], except that the last matching element is found. |
| 1165 * That means that the result cannot be provided before this stream | 1132 * That means that the result cannot be provided before this stream |
| 1166 * is done. | 1133 * is done. |
| 1167 */ | 1134 */ |
| 1168 Future<dynamic> lastWhere(bool test(T element), {Object defaultValue()}) { | 1135 Future<dynamic> lastWhere(bool test(T element), {Object defaultValue()}) { |
| 1169 _Future<dynamic> future = new _Future(); | 1136 _Future<dynamic> future = new _Future(); |
| 1170 T result = null; | 1137 T result = null; |
| 1171 bool foundResult = false; | 1138 bool foundResult = false; |
| 1172 StreamSubscription subscription; | 1139 StreamSubscription subscription; |
| 1173 subscription = this.listen( | 1140 subscription = this.listen( |
| 1174 (T value) { | 1141 (T value) { |
| 1175 _runUserCode( | 1142 _runUserCode(() => true == test(value), (bool isMatch) { |
| 1176 () => true == test(value), | |
| 1177 (bool isMatch) { | |
| 1178 if (isMatch) { | 1143 if (isMatch) { |
| 1179 foundResult = true; | 1144 foundResult = true; |
| 1180 result = value; | 1145 result = value; |
| 1181 } | 1146 } |
| 1182 }, | 1147 }, _cancelAndErrorClosure(subscription, future)); |
| 1183 _cancelAndErrorClosure(subscription, future) | 1148 }, |
| 1184 ); | 1149 onError: future._completeError, |
| 1185 }, | 1150 onDone: () { |
| 1186 onError: future._completeError, | 1151 if (foundResult) { |
| 1187 onDone: () { | 1152 future._complete(result); |
| 1188 if (foundResult) { | 1153 return; |
| 1189 future._complete(result); | 1154 } |
| 1190 return; | 1155 if (defaultValue != null) { |
| 1191 } | 1156 _runUserCode(defaultValue, future._complete, future._completeError); |
| 1192 if (defaultValue != null) { | 1157 return; |
| 1193 _runUserCode(defaultValue, future._complete, future._completeError); | 1158 } |
| 1194 return; | 1159 try { |
| 1195 } | 1160 throw IterableElementError.noElement(); |
| 1196 try { | 1161 } catch (e, s) { |
| 1197 throw IterableElementError.noElement(); | 1162 _completeWithErrorCallback(future, e, s); |
| 1198 } catch (e, s) { | 1163 } |
| 1199 _completeWithErrorCallback(future, e, s); | 1164 }, |
| 1200 } | 1165 cancelOnError: true); |
| 1201 }, | |
| 1202 cancelOnError: true); | |
| 1203 return future; | 1166 return future; |
| 1204 } | 1167 } |
| 1205 | 1168 |
| 1206 /** | 1169 /** |
| 1207 * Finds the single element in this stream matching [test]. | 1170 * Finds the single element in this stream matching [test]. |
| 1208 * | 1171 * |
| 1209 * Like [lastMatch], except that it is an error if more than one | 1172 * Like [lastMatch], except that it is an error if more than one |
| 1210 * matching element occurs in the stream. | 1173 * matching element occurs in the stream. |
| 1211 */ | 1174 */ |
| 1212 Future<T> singleWhere(bool test(T element)) { | 1175 Future<T> singleWhere(bool test(T element)) { |
| 1213 _Future<T> future = new _Future<T>(); | 1176 _Future<T> future = new _Future<T>(); |
| 1214 T result = null; | 1177 T result = null; |
| 1215 bool foundResult = false; | 1178 bool foundResult = false; |
| 1216 StreamSubscription subscription; | 1179 StreamSubscription subscription; |
| 1217 subscription = this.listen( | 1180 subscription = this.listen( |
| 1218 (T value) { | 1181 (T value) { |
| 1219 _runUserCode( | 1182 _runUserCode(() => true == test(value), (bool isMatch) { |
| 1220 () => true == test(value), | |
| 1221 (bool isMatch) { | |
| 1222 if (isMatch) { | 1183 if (isMatch) { |
| 1223 if (foundResult) { | 1184 if (foundResult) { |
| 1224 try { | 1185 try { |
| 1225 throw IterableElementError.tooMany(); | 1186 throw IterableElementError.tooMany(); |
| 1226 } catch (e, s) { | 1187 } catch (e, s) { |
| 1227 _cancelAndErrorWithReplacement(subscription, future, e, s); | 1188 _cancelAndErrorWithReplacement(subscription, future, e, s); |
| 1228 } | 1189 } |
| 1229 return; | 1190 return; |
| 1230 } | 1191 } |
| 1231 foundResult = true; | 1192 foundResult = true; |
| 1232 result = value; | 1193 result = value; |
| 1233 } | 1194 } |
| 1234 }, | 1195 }, _cancelAndErrorClosure(subscription, future)); |
| 1235 _cancelAndErrorClosure(subscription, future) | 1196 }, |
| 1236 ); | 1197 onError: future._completeError, |
| 1237 }, | 1198 onDone: () { |
| 1238 onError: future._completeError, | 1199 if (foundResult) { |
| 1239 onDone: () { | 1200 future._complete(result); |
| 1240 if (foundResult) { | 1201 return; |
| 1241 future._complete(result); | 1202 } |
| 1242 return; | 1203 try { |
| 1243 } | 1204 throw IterableElementError.noElement(); |
| 1244 try { | 1205 } catch (e, s) { |
| 1245 throw IterableElementError.noElement(); | 1206 _completeWithErrorCallback(future, e, s); |
| 1246 } catch (e, s) { | 1207 } |
| 1247 _completeWithErrorCallback(future, e, s); | 1208 }, |
| 1248 } | 1209 cancelOnError: true); |
| 1249 }, | |
| 1250 cancelOnError: true); | |
| 1251 return future; | 1210 return future; |
| 1252 } | 1211 } |
| 1253 | 1212 |
| 1254 /** | 1213 /** |
| 1255 * Returns the value of the [index]th data event of this stream. | 1214 * Returns the value of the [index]th data event of this stream. |
| 1256 * | 1215 * |
| 1257 * Stops listening to the stream after the [index]th data event has been | 1216 * Stops listening to the stream after the [index]th data event has been |
| 1258 * received. | 1217 * received. |
| 1259 * | 1218 * |
| 1260 * Internally the method cancels its subscription after these elements. This | 1219 * Internally the method cancels its subscription after these elements. This |
| 1261 * means that single-subscription (non-broadcast) streams are closed and | 1220 * means that single-subscription (non-broadcast) streams are closed and |
| 1262 * cannot be reused after a call to this method. | 1221 * cannot be reused after a call to this method. |
| 1263 * | 1222 * |
| 1264 * If an error event occurs before the value is found, the future completes | 1223 * If an error event occurs before the value is found, the future completes |
| 1265 * with this error. | 1224 * with this error. |
| 1266 * | 1225 * |
| 1267 * If a done event occurs before the value is found, the future completes | 1226 * If a done event occurs before the value is found, the future completes |
| 1268 * with a [RangeError]. | 1227 * with a [RangeError]. |
| 1269 */ | 1228 */ |
| 1270 Future<T> elementAt(int index) { | 1229 Future<T> elementAt(int index) { |
| 1271 if (index is! int || index < 0) throw new ArgumentError(index); | 1230 if (index is! int || index < 0) throw new ArgumentError(index); |
| 1272 _Future<T> future = new _Future<T>(); | 1231 _Future<T> future = new _Future<T>(); |
| 1273 StreamSubscription subscription; | 1232 StreamSubscription subscription; |
| 1274 int elementIndex = 0; | 1233 int elementIndex = 0; |
| 1275 subscription = this.listen( | 1234 subscription = this.listen( |
| 1276 (T value) { | 1235 (T value) { |
| 1277 if (index == elementIndex) { | 1236 if (index == elementIndex) { |
| 1278 _cancelAndValue(subscription, future, value); | 1237 _cancelAndValue(subscription, future, value); |
| 1279 return; | 1238 return; |
| 1280 } | 1239 } |
| 1281 elementIndex += 1; | 1240 elementIndex += 1; |
| 1282 }, | 1241 }, |
| 1283 onError: future._completeError, | 1242 onError: future._completeError, |
| 1284 onDone: () { | 1243 onDone: () { |
| 1285 future._completeError( | 1244 future._completeError( |
| 1286 new RangeError.index(index, this, "index", null, elementIndex)); | 1245 new RangeError.index(index, this, "index", null, elementIndex)); |
| 1287 }, | 1246 }, |
| 1288 cancelOnError: true); | 1247 cancelOnError: true); |
| 1289 return future; | 1248 return future; |
| 1290 } | 1249 } |
| 1291 | 1250 |
| 1292 /** | 1251 /** |
| 1293 * Creates a new stream with the same events as this stream. | 1252 * Creates a new stream with the same events as this stream. |
| 1294 * | 1253 * |
| 1295 * Whenever more than [timeLimit] passes between two events from this stream, | 1254 * Whenever more than [timeLimit] passes between two events from this stream, |
| 1296 * the [onTimeout] function is called. | 1255 * the [onTimeout] function is called. |
| 1297 * | 1256 * |
| 1298 * The countdown doesn't start until the returned stream is listened to. | 1257 * The countdown doesn't start until the returned stream is listened to. |
| (...skipping 18 matching lines...) Expand all Loading... |
| 1317 StreamSubscription<T> subscription; | 1276 StreamSubscription<T> subscription; |
| 1318 Timer timer; | 1277 Timer timer; |
| 1319 Zone zone; | 1278 Zone zone; |
| 1320 _TimerCallback timeout; | 1279 _TimerCallback timeout; |
| 1321 | 1280 |
| 1322 void onData(T event) { | 1281 void onData(T event) { |
| 1323 timer.cancel(); | 1282 timer.cancel(); |
| 1324 controller.add(event); | 1283 controller.add(event); |
| 1325 timer = zone.createTimer(timeLimit, timeout); | 1284 timer = zone.createTimer(timeLimit, timeout); |
| 1326 } | 1285 } |
| 1286 |
| 1327 void onError(error, StackTrace stackTrace) { | 1287 void onError(error, StackTrace stackTrace) { |
| 1328 timer.cancel(); | 1288 timer.cancel(); |
| 1329 assert(controller is _StreamController || | 1289 assert(controller is _StreamController || |
| 1330 controller is _BroadcastStreamController); | 1290 controller is _BroadcastStreamController); |
| 1331 dynamic eventSink = controller; | 1291 dynamic eventSink = controller; |
| 1332 eventSink._addError(error, stackTrace); // Avoid Zone error replacement. | 1292 eventSink._addError(error, stackTrace); // Avoid Zone error replacement. |
| 1333 timer = zone.createTimer(timeLimit, timeout); | 1293 timer = zone.createTimer(timeLimit, timeout); |
| 1334 } | 1294 } |
| 1295 |
| 1335 void onDone() { | 1296 void onDone() { |
| 1336 timer.cancel(); | 1297 timer.cancel(); |
| 1337 controller.close(); | 1298 controller.close(); |
| 1338 } | 1299 } |
| 1300 |
| 1339 void onListen() { | 1301 void onListen() { |
| 1340 // This is the onListen callback for of controller. | 1302 // This is the onListen callback for of controller. |
| 1341 // It runs in the same zone that the subscription was created in. | 1303 // It runs in the same zone that the subscription was created in. |
| 1342 // Use that zone for creating timers and running the onTimeout | 1304 // Use that zone for creating timers and running the onTimeout |
| 1343 // callback. | 1305 // callback. |
| 1344 zone = Zone.current; | 1306 zone = Zone.current; |
| 1345 if (onTimeout == null) { | 1307 if (onTimeout == null) { |
| 1346 timeout = () { | 1308 timeout = () { |
| 1347 controller.addError(new TimeoutException("No stream event", | 1309 controller.addError( |
| 1348 timeLimit), null); | 1310 new TimeoutException("No stream event", timeLimit), null); |
| 1349 }; | 1311 }; |
| 1350 } else { | 1312 } else { |
| 1351 // TODO(floitsch): the return type should be 'void', and the type | 1313 // TODO(floitsch): the return type should be 'void', and the type |
| 1352 // should be inferred. | 1314 // should be inferred. |
| 1353 var registeredOnTimeout = | 1315 var registeredOnTimeout = |
| 1354 zone.registerUnaryCallback<dynamic, EventSink<T>>(onTimeout); | 1316 zone.registerUnaryCallback<dynamic, EventSink<T>>(onTimeout); |
| 1355 _ControllerEventSinkWrapper wrapper = | 1317 _ControllerEventSinkWrapper wrapper = |
| 1356 new _ControllerEventSinkWrapper(null); | 1318 new _ControllerEventSinkWrapper(null); |
| 1357 timeout = () { | 1319 timeout = () { |
| 1358 wrapper._sink = controller; // Only valid during call. | 1320 wrapper._sink = controller; // Only valid during call. |
| 1359 zone.runUnaryGuarded(registeredOnTimeout, wrapper); | 1321 zone.runUnaryGuarded(registeredOnTimeout, wrapper); |
| 1360 wrapper._sink = null; | 1322 wrapper._sink = null; |
| 1361 }; | 1323 }; |
| 1362 } | 1324 } |
| 1363 | 1325 |
| 1364 subscription = this.listen(onData, onError: onError, onDone: onDone); | 1326 subscription = this.listen(onData, onError: onError, onDone: onDone); |
| 1365 timer = zone.createTimer(timeLimit, timeout); | 1327 timer = zone.createTimer(timeLimit, timeout); |
| 1366 } | 1328 } |
| 1329 |
| 1367 Future onCancel() { | 1330 Future onCancel() { |
| 1368 timer.cancel(); | 1331 timer.cancel(); |
| 1369 Future result = subscription.cancel(); | 1332 Future result = subscription.cancel(); |
| 1370 subscription = null; | 1333 subscription = null; |
| 1371 return result; | 1334 return result; |
| 1372 } | 1335 } |
| 1336 |
| 1373 controller = isBroadcast | 1337 controller = isBroadcast |
| 1374 ? new _SyncBroadcastStreamController<T>(onListen, onCancel) | 1338 ? new _SyncBroadcastStreamController<T>(onListen, onCancel) |
| 1375 : new _SyncStreamController<T>( | 1339 : new _SyncStreamController<T>(onListen, () { |
| 1376 onListen, | 1340 // Don't null the timer, onCancel may call cancel again. |
| 1377 () { | 1341 timer.cancel(); |
| 1378 // Don't null the timer, onCancel may call cancel again. | 1342 subscription.pause(); |
| 1379 timer.cancel(); | 1343 }, () { |
| 1380 subscription.pause(); | 1344 subscription.resume(); |
| 1381 }, | 1345 timer = zone.createTimer(timeLimit, timeout); |
| 1382 () { | 1346 }, onCancel); |
| 1383 subscription.resume(); | |
| 1384 timer = zone.createTimer(timeLimit, timeout); | |
| 1385 }, | |
| 1386 onCancel); | |
| 1387 return controller.stream; | 1347 return controller.stream; |
| 1388 } | 1348 } |
| 1389 } | 1349 } |
| 1390 | 1350 |
| 1391 /** | 1351 /** |
| 1392 * A subscription on events from a [Stream]. | 1352 * A subscription on events from a [Stream]. |
| 1393 * | 1353 * |
| 1394 * When you listen on a [Stream] using [Stream.listen], | 1354 * When you listen on a [Stream] using [Stream.listen], |
| 1395 * a [StreamSubscription] object is returned. | 1355 * a [StreamSubscription] object is returned. |
| 1396 * | 1356 * |
| (...skipping 97 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 1494 * | 1454 * |
| 1495 * In case of an error the subscription will automatically cancel (even | 1455 * In case of an error the subscription will automatically cancel (even |
| 1496 * when it was listening with `cancelOnError` set to `false`). | 1456 * when it was listening with `cancelOnError` set to `false`). |
| 1497 * | 1457 * |
| 1498 * In case of a `done` event the future completes with the given | 1458 * In case of a `done` event the future completes with the given |
| 1499 * [futureValue]. | 1459 * [futureValue]. |
| 1500 */ | 1460 */ |
| 1501 Future<E> asFuture<E>([E futureValue]); | 1461 Future<E> asFuture<E>([E futureValue]); |
| 1502 } | 1462 } |
| 1503 | 1463 |
| 1504 | |
| 1505 /** | 1464 /** |
| 1506 * An interface that abstracts creation or handling of [Stream] events. | 1465 * An interface that abstracts creation or handling of [Stream] events. |
| 1507 */ | 1466 */ |
| 1508 abstract class EventSink<T> implements Sink<T> { | 1467 abstract class EventSink<T> implements Sink<T> { |
| 1509 /** Send a data event to a stream. */ | 1468 /** Send a data event to a stream. */ |
| 1510 void add(T event); | 1469 void add(T event); |
| 1511 | 1470 |
| 1512 /** Send an async error to a stream. */ | 1471 /** Send an async error to a stream. */ |
| 1513 void addError(errorEvent, [StackTrace stackTrace]); | 1472 void addError(errorEvent, [StackTrace stackTrace]); |
| 1514 | 1473 |
| 1515 /** Close the sink. No further events can be added after closing. */ | 1474 /** Close the sink. No further events can be added after closing. */ |
| 1516 void close(); | 1475 void close(); |
| 1517 } | 1476 } |
| 1518 | 1477 |
| 1519 | |
| 1520 /** [Stream] wrapper that only exposes the [Stream] interface. */ | 1478 /** [Stream] wrapper that only exposes the [Stream] interface. */ |
| 1521 class StreamView<T> extends Stream<T> { | 1479 class StreamView<T> extends Stream<T> { |
| 1522 final Stream<T> _stream; | 1480 final Stream<T> _stream; |
| 1523 | 1481 |
| 1524 const StreamView(Stream<T> stream) : _stream = stream, super._internal(); | 1482 const StreamView(Stream<T> stream) |
| 1483 : _stream = stream, |
| 1484 super._internal(); |
| 1525 | 1485 |
| 1526 bool get isBroadcast => _stream.isBroadcast; | 1486 bool get isBroadcast => _stream.isBroadcast; |
| 1527 | 1487 |
| 1528 Stream<T> asBroadcastStream( | 1488 Stream<T> asBroadcastStream( |
| 1529 {void onListen(StreamSubscription<T> subscription), | 1489 {void onListen(StreamSubscription<T> subscription), |
| 1530 void onCancel(StreamSubscription<T> subscription)}) | 1490 void onCancel(StreamSubscription<T> subscription)}) => |
| 1531 => _stream.asBroadcastStream(onListen: onListen, onCancel: onCancel); | 1491 _stream.asBroadcastStream(onListen: onListen, onCancel: onCancel); |
| 1532 | 1492 |
| 1533 StreamSubscription<T> listen(void onData(T value), | 1493 StreamSubscription<T> listen(void onData(T value), |
| 1534 { Function onError, | 1494 {Function onError, void onDone(), bool cancelOnError}) { |
| 1535 void onDone(), | 1495 return _stream.listen(onData, |
| 1536 bool cancelOnError }) { | 1496 onError: onError, onDone: onDone, cancelOnError: cancelOnError); |
| 1537 return _stream.listen(onData, onError: onError, onDone: onDone, | |
| 1538 cancelOnError: cancelOnError); | |
| 1539 } | 1497 } |
| 1540 } | 1498 } |
| 1541 | 1499 |
| 1542 | |
| 1543 /** | 1500 /** |
| 1544 * Abstract interface for a "sink" accepting multiple entire streams. | 1501 * Abstract interface for a "sink" accepting multiple entire streams. |
| 1545 * | 1502 * |
| 1546 * A consumer can accept a number of consecutive streams using [addStream], | 1503 * A consumer can accept a number of consecutive streams using [addStream], |
| 1547 * and when no further data need to be added, the [close] method tells the | 1504 * and when no further data need to be added, the [close] method tells the |
| 1548 * consumer to complete its work and shut down. | 1505 * consumer to complete its work and shut down. |
| 1549 * | 1506 * |
| 1550 * This class is not just a [Sink<Stream>] because it is also combined with | 1507 * This class is not just a [Sink<Stream>] because it is also combined with |
| 1551 * other [Sink] classes, like it's combined with [EventSink] in the | 1508 * other [Sink] classes, like it's combined with [EventSink] in the |
| 1552 * [StreamSink] class. | 1509 * [StreamSink] class. |
| (...skipping 29 matching lines...) Expand all Loading... |
| 1582 * This allows the consumer to complete any remaining work and release | 1539 * This allows the consumer to complete any remaining work and release |
| 1583 * resources that are no longer needed | 1540 * resources that are no longer needed |
| 1584 * | 1541 * |
| 1585 * Returns a future which is completed when the consumer has shut down. | 1542 * Returns a future which is completed when the consumer has shut down. |
| 1586 * If cleaning up can fail, the error may be reported in the returned future, | 1543 * If cleaning up can fail, the error may be reported in the returned future, |
| 1587 * otherwise it completes with `null`. | 1544 * otherwise it completes with `null`. |
| 1588 */ | 1545 */ |
| 1589 Future close(); | 1546 Future close(); |
| 1590 } | 1547 } |
| 1591 | 1548 |
| 1592 | |
| 1593 /** | 1549 /** |
| 1594 * A object that accepts stream events both synchronously and asynchronously. | 1550 * A object that accepts stream events both synchronously and asynchronously. |
| 1595 * | 1551 * |
| 1596 * A [StreamSink] unifies the asynchronous methods from [StreamConsumer] and | 1552 * A [StreamSink] unifies the asynchronous methods from [StreamConsumer] and |
| 1597 * the synchronous methods from [EventSink]. | 1553 * the synchronous methods from [EventSink]. |
| 1598 * | 1554 * |
| 1599 * The [EventSink] methods can't be used while the [addStream] is called. | 1555 * The [EventSink] methods can't be used while the [addStream] is called. |
| 1600 * As soon as the [addStream]'s [Future] completes with a value, the | 1556 * As soon as the [addStream]'s [Future] completes with a value, the |
| 1601 * [EventSink] methods can be used again. | 1557 * [EventSink] methods can be used again. |
| 1602 * | 1558 * |
| (...skipping 38 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 1641 * | 1597 * |
| 1642 * Otherwise, the returned future will complete when either: | 1598 * Otherwise, the returned future will complete when either: |
| 1643 * | 1599 * |
| 1644 * * all events have been processed and the sink has been closed, or | 1600 * * all events have been processed and the sink has been closed, or |
| 1645 * * the sink has otherwise been stopped from handling more events | 1601 * * the sink has otherwise been stopped from handling more events |
| 1646 * (for example by cancelling a stream subscription). | 1602 * (for example by cancelling a stream subscription). |
| 1647 */ | 1603 */ |
| 1648 Future get done; | 1604 Future get done; |
| 1649 } | 1605 } |
| 1650 | 1606 |
| 1651 | |
| 1652 /** | 1607 /** |
| 1653 * The target of a [Stream.transform] call. | 1608 * The target of a [Stream.transform] call. |
| 1654 * | 1609 * |
| 1655 * The [Stream.transform] call will pass itself to this object and then return | 1610 * The [Stream.transform] call will pass itself to this object and then return |
| 1656 * the resulting stream. | 1611 * the resulting stream. |
| 1657 * | 1612 * |
| 1658 * It is good practice to write transformers that can be used multiple times. | 1613 * It is good practice to write transformers that can be used multiple times. |
| 1659 */ | 1614 */ |
| 1660 abstract class StreamTransformer<S, T> { | 1615 abstract class StreamTransformer<S, T> { |
| 1661 /** | 1616 /** |
| (...skipping 53 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 1715 * cancelOnError: cancelOnError); | 1670 * cancelOnError: cancelOnError); |
| 1716 * }, | 1671 * }, |
| 1717 * onPause: () { subscription.pause(); }, | 1672 * onPause: () { subscription.pause(); }, |
| 1718 * onResume: () { subscription.resume(); }, | 1673 * onResume: () { subscription.resume(); }, |
| 1719 * onCancel: () => subscription.cancel(), | 1674 * onCancel: () => subscription.cancel(), |
| 1720 * sync: true); | 1675 * sync: true); |
| 1721 * return controller.stream.listen(null); | 1676 * return controller.stream.listen(null); |
| 1722 * }); | 1677 * }); |
| 1723 */ | 1678 */ |
| 1724 const factory StreamTransformer( | 1679 const factory StreamTransformer( |
| 1725 StreamSubscription<T> transformer(Stream<S> stream, bool cancelOnError)) | 1680 StreamSubscription<T> transformer( |
| 1726 = _StreamSubscriptionTransformer<S, T>; | 1681 Stream<S> stream, bool cancelOnError)) = |
| 1682 _StreamSubscriptionTransformer<S, T>; |
| 1727 | 1683 |
| 1728 /** | 1684 /** |
| 1729 * Creates a [StreamTransformer] that delegates events to the given functions. | 1685 * Creates a [StreamTransformer] that delegates events to the given functions. |
| 1730 * | 1686 * |
| 1731 * Example use of a duplicating transformer: | 1687 * Example use of a duplicating transformer: |
| 1732 * | 1688 * |
| 1733 * stringStream.transform(new StreamTransformer<String, String>.fromHandle
rs( | 1689 * stringStream.transform(new StreamTransformer<String, String>.fromHandle
rs( |
| 1734 * handleData: (String value, EventSink<String> sink) { | 1690 * handleData: (String value, EventSink<String> sink) { |
| 1735 * sink.add(value); | 1691 * sink.add(value); |
| 1736 * sink.add(value); // Duplicate the incoming events. | 1692 * sink.add(value); // Duplicate the incoming events. |
| 1737 * })); | 1693 * })); |
| 1738 */ | 1694 */ |
| 1739 factory StreamTransformer.fromHandlers({ | 1695 factory StreamTransformer.fromHandlers( |
| 1740 void handleData(S data, EventSink<T> sink), | 1696 {void handleData(S data, EventSink<T> sink), |
| 1741 void handleError(Object error, StackTrace stackTrace, EventSink<T> sink), | 1697 void handleError(Object error, StackTrace stackTrace, EventSink<T> sink), |
| 1742 void handleDone(EventSink<T> sink)}) | 1698 void handleDone(EventSink<T> sink)}) = _StreamHandlerTransformer<S, T>; |
| 1743 = _StreamHandlerTransformer<S, T>; | |
| 1744 | 1699 |
| 1745 /** | 1700 /** |
| 1746 * Transform the incoming [stream]'s events. | 1701 * Transform the incoming [stream]'s events. |
| 1747 * | 1702 * |
| 1748 * Creates a new stream. | 1703 * Creates a new stream. |
| 1749 * When this stream is listened to, it will start listening on [stream], | 1704 * When this stream is listened to, it will start listening on [stream], |
| 1750 * and generate events on the new stream based on the events from [stream]. | 1705 * and generate events on the new stream based on the events from [stream]. |
| 1751 * | 1706 * |
| 1752 * Subscriptions on the returned stream should propagate pause state | 1707 * Subscriptions on the returned stream should propagate pause state |
| 1753 * to the subscription on [stream]. | 1708 * to the subscription on [stream]. |
| 1754 */ | 1709 */ |
| 1755 Stream<T> bind(Stream<S> stream); | 1710 Stream<T> bind(Stream<S> stream); |
| 1756 } | 1711 } |
| 1757 | 1712 |
| 1758 /** | 1713 /** |
| 1759 * An [Iterator] like interface for the values of a [Stream]. | 1714 * An [Iterator] like interface for the values of a [Stream]. |
| 1760 * | 1715 * |
| 1761 * This wraps a [Stream] and a subscription on the stream. It listens | 1716 * This wraps a [Stream] and a subscription on the stream. It listens |
| 1762 * on the stream, and completes the future returned by [moveNext] when the | 1717 * on the stream, and completes the future returned by [moveNext] when the |
| 1763 * next value becomes available. | 1718 * next value becomes available. |
| 1764 * | 1719 * |
| 1765 * The stream may be paused between calls to [moveNext]. | 1720 * The stream may be paused between calls to [moveNext]. |
| 1766 */ | 1721 */ |
| 1767 abstract class StreamIterator<T> { | 1722 abstract class StreamIterator<T> { |
| 1768 | |
| 1769 /** Create a [StreamIterator] on [stream]. */ | 1723 /** Create a [StreamIterator] on [stream]. */ |
| 1770 factory StreamIterator(Stream<T> stream) | 1724 factory StreamIterator(Stream<T> stream) |
| 1771 // TODO(lrn): use redirecting factory constructor when type | 1725 // TODO(lrn): use redirecting factory constructor when type |
| 1772 // arguments are supported. | 1726 // arguments are supported. |
| 1773 => new _StreamIterator<T>(stream); | 1727 => |
| 1728 new _StreamIterator<T>(stream); |
| 1774 | 1729 |
| 1775 /** | 1730 /** |
| 1776 * Wait for the next stream value to be available. | 1731 * Wait for the next stream value to be available. |
| 1777 * | 1732 * |
| 1778 * Returns a future which will complete with either `true` or `false`. | 1733 * Returns a future which will complete with either `true` or `false`. |
| 1779 * Completing with `true` means that another event has been received and | 1734 * Completing with `true` means that another event has been received and |
| 1780 * can be read as [current]. | 1735 * can be read as [current]. |
| 1781 * Completing with `false` means that the stream iteration is done and | 1736 * Completing with `false` means that the stream iteration is done and |
| 1782 * no further events will ever be available. | 1737 * no further events will ever be available. |
| 1783 * The future may complete with an error, if the stream produces an error, | 1738 * The future may complete with an error, if the stream produces an error, |
| (...skipping 31 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 1815 * If [moveNext] has been called when the iterator is canceled, | 1770 * If [moveNext] has been called when the iterator is canceled, |
| 1816 * its returned future will complete with `false` as value, | 1771 * its returned future will complete with `false` as value, |
| 1817 * as will all further calls to [moveNext]. | 1772 * as will all further calls to [moveNext]. |
| 1818 * | 1773 * |
| 1819 * Returns a future if the cancel-operation is not completed synchronously. | 1774 * Returns a future if the cancel-operation is not completed synchronously. |
| 1820 * Otherwise returns `null`. | 1775 * Otherwise returns `null`. |
| 1821 */ | 1776 */ |
| 1822 Future cancel(); | 1777 Future cancel(); |
| 1823 } | 1778 } |
| 1824 | 1779 |
| 1825 | |
| 1826 /** | 1780 /** |
| 1827 * Wraps an [_EventSink] so it exposes only the [EventSink] interface. | 1781 * Wraps an [_EventSink] so it exposes only the [EventSink] interface. |
| 1828 */ | 1782 */ |
| 1829 class _ControllerEventSinkWrapper<T> implements EventSink<T> { | 1783 class _ControllerEventSinkWrapper<T> implements EventSink<T> { |
| 1830 EventSink _sink; | 1784 EventSink _sink; |
| 1831 _ControllerEventSinkWrapper(this._sink); | 1785 _ControllerEventSinkWrapper(this._sink); |
| 1832 | 1786 |
| 1833 void add(T data) { _sink.add(data); } | 1787 void add(T data) { |
| 1788 _sink.add(data); |
| 1789 } |
| 1790 |
| 1834 void addError(error, [StackTrace stackTrace]) { | 1791 void addError(error, [StackTrace stackTrace]) { |
| 1835 _sink.addError(error, stackTrace); | 1792 _sink.addError(error, stackTrace); |
| 1836 } | 1793 } |
| 1837 void close() { _sink.close(); } | 1794 |
| 1795 void close() { |
| 1796 _sink.close(); |
| 1797 } |
| 1838 } | 1798 } |
| OLD | NEW |