| OLD | NEW |
| (Empty) |
| 1 // Copyright (c) 2013, the Dart project authors. Please see the AUTHORS file | |
| 2 // for details. All rights reserved. Use of this source code is governed by a | |
| 3 // BSD-style license that can be found in the LICENSE file. | |
| 4 | |
| 5 part of dart.async; | |
| 6 | |
| 7 // ------------------------------------------------------------------- | |
| 8 // Core Stream types | |
| 9 // ------------------------------------------------------------------- | |
| 10 | |
| 11 typedef void _TimerCallback(); | |
| 12 | |
| 13 /** | |
| 14 * A source of asynchronous data events. | |
| 15 * | |
| 16 * A Stream provides a way to receive a sequence of events. | |
| 17 * Each event is either a data event or an error event, | |
| 18 * representing the result of a single computation. | |
| 19 * When the events provided by a Stream have all been sent, | |
| 20 * a single "done" event will mark the end. | |
| 21 * | |
| 22 * You can [listen] on a stream to make it start generating events, | |
| 23 * and to set up listeners that receive the events. | |
| 24 * When you listen, you receive a [StreamSubscription] object | |
| 25 * which is the active object providing the events, | |
| 26 * and which can be used to stop listening again, | |
| 27 * or to temporarily pause events from the subscription. | |
| 28 * | |
| 29 * There are two kinds of streams: "Single-subscription" streams and | |
| 30 * "broadcast" streams. | |
| 31 * | |
| 32 * *A single-subscription stream* allows only a single listener during the whole | |
| 33 * lifetime of the stream. | |
| 34 * It doesn't start generating events until it has a listener, | |
| 35 * and it stops sending events when the listener is unsubscribed, | |
| 36 * even if the source of events could still provide more. | |
| 37 * | |
| 38 * Listening twice on a single-subscription stream is not allowed, even after | |
| 39 * the first subscription has been canceled. | |
| 40 * | |
| 41 * Single-subscription streams are generally used for streaming chunks of | |
| 42 * larger contiguous data like file I/O. | |
| 43 * | |
| 44 * *A broadcast stream* allows any number of listeners, and it fires | |
| 45 * its events when they are ready, whether there are listeners or not. | |
| 46 * | |
| 47 * Broadcast streams are used for independent events/observers. | |
| 48 * | |
| 49 * If several listeners want to listen to a single subscription stream, | |
| 50 * use [asBroadcastStream] to create a broadcast stream on top of the | |
| 51 * non-broadcast stream. | |
| 52 * | |
| 53 * On either kind of stream, stream transformations, such as [where] and | |
| 54 * [skip], return the same type of stream as the one the method was called on, | |
| 55 * unless otherwise noted. | |
| 56 * | |
| 57 * When an event is fired, the listener(s) at that time will receive the event. | |
| 58 * If a listener is added to a broadcast stream while an event is being fired, | |
| 59 * that listener will not receive the event currently being fired. | |
| 60 * If a listener is canceled, it immediately stops receiving events. | |
| 61 * | |
| 62 * When the "done" event is fired, subscribers are unsubscribed before | |
| 63 * receiving the event. After the event has been sent, the stream has no | |
| 64 * subscribers. Adding new subscribers to a broadcast stream after this point | |
| 65 * is allowed, but they will just receive a new "done" event as soon | |
| 66 * as possible. | |
| 67 * | |
| 68 * Stream subscriptions always respect "pause" requests. If necessary they need | |
| 69 * to buffer their input, but often, and preferably, they can simply request | |
| 70 * their input to pause too. | |
| 71 * | |
| 72 * The default implementation of [isBroadcast] returns false. | |
| 73 * A broadcast stream inheriting from [Stream] must override [isBroadcast] | |
| 74 * to return `true`. | |
| 75 */ | |
| 76 abstract class Stream<T> { | |
| 77 Stream(); | |
| 78 | |
| 79 /** | |
| 80 * Internal use only. We do not want to promise that Stream stays const. | |
| 81 * | |
| 82 * If mixins become compatible with const constructors, we may use a | |
| 83 * stream mixin instead of extending Stream from a const class. | |
| 84 */ | |
| 85 const Stream._internal(); | |
| 86 | |
| 87 /** | |
| 88 * Creates an empty broadcast stream. | |
| 89 * | |
| 90 * This is a stream which does nothing except sending a done event | |
| 91 * when it's listened to. | |
| 92 */ | |
| 93 const factory Stream.empty() = _EmptyStream<T>; | |
| 94 | |
| 95 /** | |
| 96 * Creates a new single-subscription stream from the future. | |
| 97 * | |
| 98 * When the future completes, the stream will fire one event, either | |
| 99 * data or error, and then close with a done-event. | |
| 100 */ | |
| 101 factory Stream.fromFuture(Future<T> future) { | |
| 102 // Use the controller's buffering to fill in the value even before | |
| 103 // the stream has a listener. For a single value, it's not worth it | |
| 104 // to wait for a listener before doing the `then` on the future. | |
| 105 _StreamController<T> controller = new StreamController<T>(sync: true); | |
| 106 future.then((value) { | |
| 107 controller._add(value); | |
| 108 controller._closeUnchecked(); | |
| 109 }, | |
| 110 onError: (error, stackTrace) { | |
| 111 controller._addError(error, stackTrace); | |
| 112 controller._closeUnchecked(); | |
| 113 }); | |
| 114 return controller.stream; | |
| 115 } | |
| 116 | |
| 117 /** | |
| 118 * Create a stream from a group of futures. | |
| 119 * | |
| 120 * The stream reports the results of the futures on the stream in the order | |
| 121 * in which the futures complete. | |
| 122 * | |
| 123 * If some futures have completed before calling `Stream.fromFutures`, | |
| 124 * their result will be output on the created stream in some unspecified | |
| 125 * order. | |
| 126 * | |
| 127 * When all futures have completed, the stream is closed. | |
| 128 * | |
| 129 * If no future is passed, the stream closes as soon as possible. | |
| 130 */ | |
| 131 factory Stream.fromFutures(Iterable<Future<T>> futures) { | |
| 132 _StreamController<T> controller = new StreamController<T>(sync: true); | |
| 133 int count = 0; | |
| 134 var onValue = (T value) { | |
| 135 if (!controller.isClosed) { | |
| 136 controller._add(value); | |
| 137 if (--count == 0) controller._closeUnchecked(); | |
| 138 } | |
| 139 }; | |
| 140 var onError = (error, stack) { | |
| 141 if (!controller.isClosed) { | |
| 142 controller._addError(error, stack); | |
| 143 if (--count == 0) controller._closeUnchecked(); | |
| 144 } | |
| 145 }; | |
| 146 // The futures are already running, so start listening to them immediately | |
| 147 // (instead of waiting for the stream to be listened on). | |
| 148 // If we wait, we might not catch errors in the futures in time. | |
| 149 for (var future in futures) { | |
| 150 count++; | |
| 151 future.then(onValue, onError: onError); | |
| 152 } | |
| 153 // Use schedule microtask since controller is sync. | |
| 154 if (count == 0) scheduleMicrotask(controller.close); | |
| 155 return controller.stream; | |
| 156 } | |
| 157 | |
| 158 /** | |
| 159 * Creates a single-subscription stream that gets its data from [data]. | |
| 160 * | |
| 161 * The iterable is iterated when the stream receives a listener, and stops | |
| 162 * iterating if the listener cancels the subscription. | |
| 163 * | |
| 164 * If iterating [data] throws an error, the stream ends immediately with | |
| 165 * that error. No done event will be sent (iteration is not complete), but no | |
| 166 * further data events will be generated either, since iteration cannot | |
| 167 * continue. | |
| 168 */ | |
| 169 factory Stream.fromIterable(Iterable<T> data) { | |
| 170 return new _GeneratedStreamImpl<T>( | |
| 171 () => new _IterablePendingEvents<T>(data)); | |
| 172 } | |
| 173 | |
| 174 /** | |
| 175 * Creates a stream that repeatedly emits events at [period] intervals. | |
| 176 * | |
| 177 * The event values are computed by invoking [computation]. The argument to | |
| 178 * this callback is an integer that starts with 0 and is incremented for | |
| 179 * every event. | |
| 180 * | |
| 181 * If [computation] is omitted the event values will all be `null`. | |
| 182 */ | |
| 183 factory Stream.periodic(Duration period, | |
| 184 [T computation(int computationCount)]) { | |
| 185 Timer timer; | |
| 186 int computationCount = 0; | |
| 187 StreamController<T> controller; | |
| 188 // Counts the time that the Stream was running (and not paused). | |
| 189 Stopwatch watch = new Stopwatch(); | |
| 190 | |
| 191 void sendEvent() { | |
| 192 watch.reset(); | |
| 193 T data; | |
| 194 if (computation != null) { | |
| 195 try { | |
| 196 data = computation(computationCount++); | |
| 197 } catch (e, s) { | |
| 198 controller.addError(e, s); | |
| 199 return; | |
| 200 } | |
| 201 } | |
| 202 controller.add(data); | |
| 203 } | |
| 204 | |
| 205 void startPeriodicTimer() { | |
| 206 assert(timer == null); | |
| 207 timer = new Timer.periodic(period, (Timer timer) { | |
| 208 sendEvent(); | |
| 209 }); | |
| 210 } | |
| 211 | |
| 212 controller = new StreamController<T>(sync: true, | |
| 213 onListen: () { | |
| 214 watch.start(); | |
| 215 startPeriodicTimer(); | |
| 216 }, | |
| 217 onPause: () { | |
| 218 timer.cancel(); | |
| 219 timer = null; | |
| 220 watch.stop(); | |
| 221 }, | |
| 222 onResume: () { | |
| 223 assert(timer == null); | |
| 224 Duration elapsed = watch.elapsed; | |
| 225 watch.start(); | |
| 226 timer = new Timer(period - elapsed, () { | |
| 227 timer = null; | |
| 228 startPeriodicTimer(); | |
| 229 sendEvent(); | |
| 230 }); | |
| 231 }, | |
| 232 onCancel: () { | |
| 233 if (timer != null) timer.cancel(); | |
| 234 timer = null; | |
| 235 }); | |
| 236 return controller.stream; | |
| 237 } | |
| 238 | |
| 239 /** | |
| 240 * Creates a stream where all events of an existing stream are piped through | |
| 241 * a sink-transformation. | |
| 242 * | |
| 243 * The given [mapSink] closure is invoked when the returned stream is | |
| 244 * listened to. All events from the [source] are added into the event sink | |
| 245 * that is returned from the invocation. The transformation puts all | |
| 246 * transformed events into the sink the [mapSink] closure received during | |
| 247 * its invocation. Conceptually the [mapSink] creates a transformation pipe | |
| 248 * with the input sink being the returned [EventSink] and the output sink | |
| 249 * being the sink it received. | |
| 250 * | |
| 251 * This constructor is frequently used to build transformers. | |
| 252 * | |
| 253 * Example use for a duplicating transformer: | |
| 254 * | |
| 255 * class DuplicationSink implements EventSink<String> { | |
| 256 * final EventSink<String> _outputSink; | |
| 257 * DuplicationSink(this._outputSink); | |
| 258 * | |
| 259 * void add(String data) { | |
| 260 * _outputSink.add(data); | |
| 261 * _outputSink.add(data); | |
| 262 * } | |
| 263 * | |
| 264 * void addError(e, [st]) { _outputSink.addError(e, st); } | |
| 265 * void close() { _outputSink.close(); } | |
| 266 * } | |
| 267 * | |
| 268 * class DuplicationTransformer implements StreamTransformer<String, Strin
g> { | |
| 269 * // Some generic types ommitted for brevety. | |
| 270 * Stream bind(Stream stream) => new Stream<String>.eventTransformed( | |
| 271 * stream, | |
| 272 * (EventSink sink) => new DuplicationSink(sink)); | |
| 273 * } | |
| 274 * | |
| 275 * stringStream.transform(new DuplicationTransformer()); | |
| 276 * | |
| 277 * The resulting stream is a broadcast stream if [source] is. | |
| 278 */ | |
| 279 factory Stream.eventTransformed(Stream source, | |
| 280 EventSink mapSink(EventSink<T> sink)) { | |
| 281 return new _BoundSinkStream(source, mapSink); | |
| 282 } | |
| 283 | |
| 284 /** | |
| 285 * Reports whether this stream is a broadcast stream. | |
| 286 */ | |
| 287 bool get isBroadcast => false; | |
| 288 | |
| 289 /** | |
| 290 * Returns a multi-subscription stream that produces the same events as this. | |
| 291 * | |
| 292 * The returned stream will subscribe to this stream when its first | |
| 293 * subscriber is added, and will stay subscribed until this stream ends, | |
| 294 * or a callback cancels the subscription. | |
| 295 * | |
| 296 * If [onListen] is provided, it is called with a subscription-like object | |
| 297 * that represents the underlying subscription to this stream. It is | |
| 298 * possible to pause, resume or cancel the subscription during the call | |
| 299 * to [onListen]. It is not possible to change the event handlers, including | |
| 300 * using [StreamSubscription.asFuture]. | |
| 301 * | |
| 302 * If [onCancel] is provided, it is called in a similar way to [onListen] | |
| 303 * when the returned stream stops having listener. If it later gets | |
| 304 * a new listener, the [onListen] function is called again. | |
| 305 * | |
| 306 * Use the callbacks, for example, for pausing the underlying subscription | |
| 307 * while having no subscribers to prevent losing events, or canceling the | |
| 308 * subscription when there are no listeners. | |
| 309 */ | |
| 310 Stream<T> asBroadcastStream({ | |
| 311 void onListen(StreamSubscription<T> subscription), | |
| 312 void onCancel(StreamSubscription<T> subscription) }) { | |
| 313 return new _AsBroadcastStream<T>(this, onListen, onCancel); | |
| 314 } | |
| 315 | |
| 316 /** | |
| 317 * Adds a subscription to this stream. | |
| 318 * | |
| 319 * On each data event from this stream, the subscriber's [onData] handler | |
| 320 * is called. If [onData] is null, nothing happens. | |
| 321 * | |
| 322 * On errors from this stream, the [onError] handler is given a | |
| 323 * object describing the error. | |
| 324 * | |
| 325 * The [onError] callback must be of type `void onError(error)` or | |
| 326 * `void onError(error, StackTrace stackTrace)`. If [onError] accepts | |
| 327 * two arguments it is called with the stack trace (which could be `null` if | |
| 328 * the stream itself received an error without stack trace). | |
| 329 * Otherwise it is called with just the error object. | |
| 330 * If [onError] is omitted, any errors on the stream are considered unhandled, | |
| 331 * and will be passed to the current [Zone]'s error handler. | |
| 332 * By default unhandled async errors are treated | |
| 333 * as if they were uncaught top-level errors. | |
| 334 * | |
| 335 * If this stream closes, the [onDone] handler is called. | |
| 336 * | |
| 337 * If [cancelOnError] is true, the subscription is ended when | |
| 338 * the first error is reported. The default is false. | |
| 339 */ | |
| 340 StreamSubscription<T> listen(void onData(T event), | |
| 341 { Function onError, | |
| 342 void onDone(), | |
| 343 bool cancelOnError}); | |
| 344 | |
| 345 /** | |
| 346 * Creates a new stream from this stream that discards some data events. | |
| 347 * | |
| 348 * The new stream sends the same error and done events as this stream, | |
| 349 * but it only sends the data events that satisfy the [test]. | |
| 350 * | |
| 351 * The returned stream is a broadcast stream if this stream is. | |
| 352 * If a broadcast stream is listened to more than once, each subscription | |
| 353 * will individually perform the `test`. | |
| 354 */ | |
| 355 Stream<T> where(bool test(T event)) { | |
| 356 return new _WhereStream<T>(this, test); | |
| 357 } | |
| 358 | |
| 359 /** | |
| 360 * Creates a new stream that converts each element of this stream | |
| 361 * to a new value using the [convert] function. | |
| 362 * | |
| 363 * For each data event, `o`, in this stream, the returned stream | |
| 364 * provides a data event with the value `convert(o)`. | |
| 365 * If [convert] throws, the returned stream reports the exception as an error | |
| 366 * event instead. | |
| 367 * | |
| 368 * Error and done events are passed through unchanged to the returned stream. | |
| 369 * | |
| 370 * The returned stream is a broadcast stream if this stream is. | |
| 371 * The [convert] function is called once per data event per listener. | |
| 372 * If a broadcast stream is listened to more than once, each subscription | |
| 373 * will individually call [convert] on each data event. | |
| 374 */ | |
| 375 Stream/*<S>*/ map/*<S>*/(/*=S*/ convert(T event)) { | |
| 376 return new _MapStream<T, dynamic/*=S*/>(this, convert); | |
| 377 } | |
| 378 | |
| 379 /** | |
| 380 * Creates a new stream with each data event of this stream asynchronously | |
| 381 * mapped to a new event. | |
| 382 * | |
| 383 * This acts like [map], except that [convert] may return a [Future], | |
| 384 * and in that case, the stream waits for that future to complete before | |
| 385 * continuing with its result. | |
| 386 * | |
| 387 * The returned stream is a broadcast stream if this stream is. | |
| 388 */ | |
| 389 Stream/*<E>*/ asyncMap/*<E>*/(convert(T event)) { | |
| 390 StreamController/*<E>*/ controller; | |
| 391 StreamSubscription/*<T>*/ subscription; | |
| 392 | |
| 393 void onListen() { | |
| 394 final add = controller.add; | |
| 395 assert(controller is _StreamController || | |
| 396 controller is _BroadcastStreamController); | |
| 397 final _EventSink/*<E>*/ eventSink = | |
| 398 controller as Object /*=_EventSink<E>*/; | |
| 399 final addError = eventSink._addError; | |
| 400 subscription = this.listen( | |
| 401 (T event) { | |
| 402 dynamic newValue; | |
| 403 try { | |
| 404 newValue = convert(event); | |
| 405 } catch (e, s) { | |
| 406 controller.addError(e, s); | |
| 407 return; | |
| 408 } | |
| 409 if (newValue is Future) { | |
| 410 subscription.pause(); | |
| 411 newValue.then(add, onError: addError) | |
| 412 .whenComplete(subscription.resume); | |
| 413 } else { | |
| 414 controller.add(newValue as Object/*=E*/); | |
| 415 } | |
| 416 }, | |
| 417 onError: addError, | |
| 418 onDone: controller.close | |
| 419 ); | |
| 420 } | |
| 421 | |
| 422 if (this.isBroadcast) { | |
| 423 controller = new StreamController/*<E>*/.broadcast( | |
| 424 onListen: onListen, | |
| 425 onCancel: () { subscription.cancel(); }, | |
| 426 sync: true | |
| 427 ); | |
| 428 } else { | |
| 429 controller = new StreamController/*<E>*/( | |
| 430 onListen: onListen, | |
| 431 onPause: () { subscription.pause(); }, | |
| 432 onResume: () { subscription.resume(); }, | |
| 433 onCancel: () { subscription.cancel(); }, | |
| 434 sync: true | |
| 435 ); | |
| 436 } | |
| 437 return controller.stream; | |
| 438 } | |
| 439 | |
| 440 /** | |
| 441 * Creates a new stream with the events of a stream per original event. | |
| 442 * | |
| 443 * This acts like [expand], except that [convert] returns a [Stream] | |
| 444 * instead of an [Iterable]. | |
| 445 * The events of the returned stream becomes the events of the returned | |
| 446 * stream, in the order they are produced. | |
| 447 * | |
| 448 * If [convert] returns `null`, no value is put on the output stream, | |
| 449 * just as if it returned an empty stream. | |
| 450 * | |
| 451 * The returned stream is a broadcast stream if this stream is. | |
| 452 */ | |
| 453 Stream/*<E>*/ asyncExpand/*<E>*/(Stream/*<E>*/ convert(T event)) { | |
| 454 StreamController/*<E>*/ controller; | |
| 455 StreamSubscription<T> subscription; | |
| 456 void onListen() { | |
| 457 assert(controller is _StreamController || | |
| 458 controller is _BroadcastStreamController); | |
| 459 final _EventSink/*<E>*/ eventSink = | |
| 460 controller as Object /*=_EventSink<E>*/; | |
| 461 subscription = this.listen( | |
| 462 (T event) { | |
| 463 Stream/*<E>*/ newStream; | |
| 464 try { | |
| 465 newStream = convert(event); | |
| 466 } catch (e, s) { | |
| 467 controller.addError(e, s); | |
| 468 return; | |
| 469 } | |
| 470 if (newStream != null) { | |
| 471 subscription.pause(); | |
| 472 controller.addStream(newStream) | |
| 473 .whenComplete(subscription.resume); | |
| 474 } | |
| 475 }, | |
| 476 onError: eventSink._addError, // Avoid Zone error replacement. | |
| 477 onDone: controller.close | |
| 478 ); | |
| 479 } | |
| 480 if (this.isBroadcast) { | |
| 481 controller = new StreamController/*<E>*/.broadcast( | |
| 482 onListen: onListen, | |
| 483 onCancel: () { subscription.cancel(); }, | |
| 484 sync: true | |
| 485 ); | |
| 486 } else { | |
| 487 controller = new StreamController/*<E>*/( | |
| 488 onListen: onListen, | |
| 489 onPause: () { subscription.pause(); }, | |
| 490 onResume: () { subscription.resume(); }, | |
| 491 onCancel: () { subscription.cancel(); }, | |
| 492 sync: true | |
| 493 ); | |
| 494 } | |
| 495 return controller.stream; | |
| 496 } | |
| 497 | |
| 498 /** | |
| 499 * Creates a wrapper Stream that intercepts some errors from this stream. | |
| 500 * | |
| 501 * If this stream sends an error that matches [test], then it is intercepted | |
| 502 * by the [handle] function. | |
| 503 * | |
| 504 * The [onError] callback must be of type `void onError(error)` or | |
| 505 * `void onError(error, StackTrace stackTrace)`. Depending on the function | |
| 506 * type the stream either invokes [onError] with or without a stack | |
| 507 * trace. The stack trace argument might be `null` if the stream itself | |
| 508 * received an error without stack trace. | |
| 509 * | |
| 510 * An asynchronous error [:e:] is matched by a test function if [:test(e):] | |
| 511 * returns true. If [test] is omitted, every error is considered matching. | |
| 512 * | |
| 513 * If the error is intercepted, the [handle] function can decide what to do | |
| 514 * with it. It can throw if it wants to raise a new (or the same) error, | |
| 515 * or simply return to make the stream forget the error. | |
| 516 * | |
| 517 * If you need to transform an error into a data event, use the more generic | |
| 518 * [Stream.transform] to handle the event by writing a data event to | |
| 519 * the output sink. | |
| 520 * | |
| 521 * The returned stream is a broadcast stream if this stream is. | |
| 522 * If a broadcast stream is listened to more than once, each subscription | |
| 523 * will individually perform the `test` and handle the error. | |
| 524 */ | |
| 525 Stream<T> handleError(Function onError, { bool test(error) }) { | |
| 526 return new _HandleErrorStream<T>(this, onError, test); | |
| 527 } | |
| 528 | |
| 529 /** | |
| 530 * Creates a new stream from this stream that converts each element | |
| 531 * into zero or more events. | |
| 532 * | |
| 533 * Each incoming event is converted to an [Iterable] of new events, | |
| 534 * and each of these new events are then sent by the returned stream | |
| 535 * in order. | |
| 536 * | |
| 537 * The returned stream is a broadcast stream if this stream is. | |
| 538 * If a broadcast stream is listened to more than once, each subscription | |
| 539 * will individually call `convert` and expand the events. | |
| 540 */ | |
| 541 Stream/*<S>*/ expand/*<S>*/(Iterable/*<S>*/ convert(T value)) { | |
| 542 return new _ExpandStream<T, dynamic/*=S*/>(this, convert); | |
| 543 } | |
| 544 | |
| 545 /** | |
| 546 * Pipe the events of this stream into [streamConsumer]. | |
| 547 * | |
| 548 * The events of this stream are added to `streamConsumer` using | |
| 549 * [StreamConsumer.addStream]. | |
| 550 * The `streamConsumer` is closed when this stream has been successfully added | |
| 551 * to it - when the future returned by `addStream` completes without an error. | |
| 552 * | |
| 553 * Returns a future which completes when the stream has been consumed | |
| 554 * and the consumer has been closed. | |
| 555 * | |
| 556 * The returned future completes with the same result as the future returned | |
| 557 * by [StreamConsumer.close]. | |
| 558 * If the adding of the stream itself fails in some way, | |
| 559 * then the consumer is expected to be closed, and won't be closed again. | |
| 560 * In that case the returned future completes with the error from calling | |
| 561 * `addStream`. | |
| 562 */ | |
| 563 Future pipe(StreamConsumer<T> streamConsumer) { | |
| 564 return streamConsumer.addStream(this).then((_) => streamConsumer.close()); | |
| 565 } | |
| 566 | |
| 567 /** | |
| 568 * Chains this stream as the input of the provided [StreamTransformer]. | |
| 569 * | |
| 570 * Returns the result of [:streamTransformer.bind:] itself. | |
| 571 * | |
| 572 * The `streamTransformer` can decide whether it wants to return a | |
| 573 * broadcast stream or not. | |
| 574 */ | |
| 575 Stream/*<S>*/ transform/*<S>*/( | |
| 576 StreamTransformer<T, dynamic/*=S*/ > streamTransformer) { | |
| 577 return streamTransformer.bind(this); | |
| 578 } | |
| 579 | |
| 580 /** | |
| 581 * Reduces a sequence of values by repeatedly applying [combine]. | |
| 582 */ | |
| 583 Future<T> reduce(T combine(T previous, T element)) { | |
| 584 _Future<T> result = new _Future<T>(); | |
| 585 bool seenFirst = false; | |
| 586 T value; | |
| 587 StreamSubscription subscription; | |
| 588 subscription = this.listen( | |
| 589 (T element) { | |
| 590 if (seenFirst) { | |
| 591 _runUserCode(() => combine(value, element), | |
| 592 (T newValue) { value = newValue; }, | |
| 593 _cancelAndErrorClosure(subscription, result)); | |
| 594 } else { | |
| 595 value = element; | |
| 596 seenFirst = true; | |
| 597 } | |
| 598 }, | |
| 599 onError: result._completeError, | |
| 600 onDone: () { | |
| 601 if (!seenFirst) { | |
| 602 try { | |
| 603 throw IterableElementError.noElement(); | |
| 604 } catch (e, s) { | |
| 605 _completeWithErrorCallback(result, e, s); | |
| 606 } | |
| 607 } else { | |
| 608 result._complete(value); | |
| 609 } | |
| 610 }, | |
| 611 cancelOnError: true | |
| 612 ); | |
| 613 return result; | |
| 614 } | |
| 615 | |
| 616 /** Reduces a sequence of values by repeatedly applying [combine]. */ | |
| 617 Future/*<S>*/ fold/*<S>*/(var/*=S*/ initialValue, | |
| 618 /*=S*/ combine(var/*=S*/ previous, T element)) { | |
| 619 | |
| 620 _Future/*<S>*/ result = new _Future/*<S>*/(); | |
| 621 var/*=S*/ value = initialValue; | |
| 622 StreamSubscription subscription; | |
| 623 subscription = this.listen( | |
| 624 (T element) { | |
| 625 _runUserCode( | |
| 626 () => combine(value, element), | |
| 627 (/*=S*/ newValue) { value = newValue; }, | |
| 628 _cancelAndErrorClosure(subscription, result) | |
| 629 ); | |
| 630 }, | |
| 631 onError: (e, st) { | |
| 632 result._completeError(e, st); | |
| 633 }, | |
| 634 onDone: () { | |
| 635 result._complete(value); | |
| 636 }, | |
| 637 cancelOnError: true); | |
| 638 return result; | |
| 639 } | |
| 640 | |
| 641 /** | |
| 642 * Collects string of data events' string representations. | |
| 643 * | |
| 644 * If [separator] is provided, it is inserted between any two | |
| 645 * elements. | |
| 646 * | |
| 647 * Any error in the stream causes the future to complete with that | |
| 648 * error. Otherwise it completes with the collected string when | |
| 649 * the "done" event arrives. | |
| 650 */ | |
| 651 Future<String> join([String separator = ""]) { | |
| 652 _Future<String> result = new _Future<String>(); | |
| 653 StringBuffer buffer = new StringBuffer(); | |
| 654 StreamSubscription subscription; | |
| 655 bool first = true; | |
| 656 subscription = this.listen( | |
| 657 (T element) { | |
| 658 if (!first) { | |
| 659 buffer.write(separator); | |
| 660 } | |
| 661 first = false; | |
| 662 try { | |
| 663 buffer.write(element); | |
| 664 } catch (e, s) { | |
| 665 _cancelAndErrorWithReplacement(subscription, result, e, s); | |
| 666 } | |
| 667 }, | |
| 668 onError: (e) { | |
| 669 result._completeError(e); | |
| 670 }, | |
| 671 onDone: () { | |
| 672 result._complete(buffer.toString()); | |
| 673 }, | |
| 674 cancelOnError: true); | |
| 675 return result; | |
| 676 } | |
| 677 | |
| 678 /** | |
| 679 * Checks whether [needle] occurs in the elements provided by this stream. | |
| 680 * | |
| 681 * Completes the [Future] when the answer is known. | |
| 682 * If this stream reports an error, the [Future] will report that error. | |
| 683 */ | |
| 684 Future<bool> contains(Object needle) { | |
| 685 _Future<bool> future = new _Future<bool>(); | |
| 686 StreamSubscription subscription; | |
| 687 subscription = this.listen( | |
| 688 (T element) { | |
| 689 _runUserCode( | |
| 690 () => (element == needle), | |
| 691 (bool isMatch) { | |
| 692 if (isMatch) { | |
| 693 _cancelAndValue(subscription, future, true); | |
| 694 } | |
| 695 }, | |
| 696 _cancelAndErrorClosure(subscription, future) | |
| 697 ); | |
| 698 }, | |
| 699 onError: future._completeError, | |
| 700 onDone: () { | |
| 701 future._complete(false); | |
| 702 }, | |
| 703 cancelOnError: true); | |
| 704 return future; | |
| 705 } | |
| 706 | |
| 707 /** | |
| 708 * Executes [action] on each data event of the stream. | |
| 709 * | |
| 710 * Completes the returned [Future] when all events of the stream | |
| 711 * have been processed. Completes the future with an error if the | |
| 712 * stream has an error event, or if [action] throws. | |
| 713 */ | |
| 714 Future forEach(void action(T element)) { | |
| 715 _Future future = new _Future(); | |
| 716 StreamSubscription subscription; | |
| 717 subscription = this.listen( | |
| 718 (T element) { | |
| 719 _runUserCode( | |
| 720 () => action(element), | |
| 721 (_) {}, | |
| 722 _cancelAndErrorClosure(subscription, future) | |
| 723 ); | |
| 724 }, | |
| 725 onError: future._completeError, | |
| 726 onDone: () { | |
| 727 future._complete(null); | |
| 728 }, | |
| 729 cancelOnError: true); | |
| 730 return future; | |
| 731 } | |
| 732 | |
| 733 /** | |
| 734 * Checks whether [test] accepts all elements provided by this stream. | |
| 735 * | |
| 736 * Completes the [Future] when the answer is known. | |
| 737 * If this stream reports an error, the [Future] will report that error. | |
| 738 */ | |
| 739 Future<bool> every(bool test(T element)) { | |
| 740 _Future<bool> future = new _Future<bool>(); | |
| 741 StreamSubscription subscription; | |
| 742 subscription = this.listen( | |
| 743 (T element) { | |
| 744 _runUserCode( | |
| 745 () => test(element), | |
| 746 (bool isMatch) { | |
| 747 if (!isMatch) { | |
| 748 _cancelAndValue(subscription, future, false); | |
| 749 } | |
| 750 }, | |
| 751 _cancelAndErrorClosure(subscription, future) | |
| 752 ); | |
| 753 }, | |
| 754 onError: future._completeError, | |
| 755 onDone: () { | |
| 756 future._complete(true); | |
| 757 }, | |
| 758 cancelOnError: true); | |
| 759 return future; | |
| 760 } | |
| 761 | |
| 762 /** | |
| 763 * Checks whether [test] accepts any element provided by this stream. | |
| 764 * | |
| 765 * Completes the [Future] when the answer is known. | |
| 766 * | |
| 767 * If this stream reports an error, the [Future] reports that error. | |
| 768 * | |
| 769 * Stops listening to the stream after the first matching element has been | |
| 770 * found. | |
| 771 * | |
| 772 * Internally the method cancels its subscription after this element. This | |
| 773 * means that single-subscription (non-broadcast) streams are closed and | |
| 774 * cannot be reused after a call to this method. | |
| 775 */ | |
| 776 Future<bool> any(bool test(T element)) { | |
| 777 _Future<bool> future = new _Future<bool>(); | |
| 778 StreamSubscription subscription; | |
| 779 subscription = this.listen( | |
| 780 (T element) { | |
| 781 _runUserCode( | |
| 782 () => test(element), | |
| 783 (bool isMatch) { | |
| 784 if (isMatch) { | |
| 785 _cancelAndValue(subscription, future, true); | |
| 786 } | |
| 787 }, | |
| 788 _cancelAndErrorClosure(subscription, future) | |
| 789 ); | |
| 790 }, | |
| 791 onError: future._completeError, | |
| 792 onDone: () { | |
| 793 future._complete(false); | |
| 794 }, | |
| 795 cancelOnError: true); | |
| 796 return future; | |
| 797 } | |
| 798 | |
| 799 | |
| 800 /** Counts the elements in the stream. */ | |
| 801 Future<int> get length { | |
| 802 _Future<int> future = new _Future<int>(); | |
| 803 int count = 0; | |
| 804 this.listen( | |
| 805 (_) { count++; }, | |
| 806 onError: future._completeError, | |
| 807 onDone: () { | |
| 808 future._complete(count); | |
| 809 }, | |
| 810 cancelOnError: true); | |
| 811 return future; | |
| 812 } | |
| 813 | |
| 814 /** | |
| 815 * Reports whether this stream contains any elements. | |
| 816 * | |
| 817 * Stops listening to the stream after the first element has been received. | |
| 818 * | |
| 819 * Internally the method cancels its subscription after the first element. | |
| 820 * This means that single-subscription (non-broadcast) streams are closed and | |
| 821 * cannot be reused after a call to this getter. | |
| 822 */ | |
| 823 Future<bool> get isEmpty { | |
| 824 _Future<bool> future = new _Future<bool>(); | |
| 825 StreamSubscription subscription; | |
| 826 subscription = this.listen( | |
| 827 (_) { | |
| 828 _cancelAndValue(subscription, future, false); | |
| 829 }, | |
| 830 onError: future._completeError, | |
| 831 onDone: () { | |
| 832 future._complete(true); | |
| 833 }, | |
| 834 cancelOnError: true); | |
| 835 return future; | |
| 836 } | |
| 837 | |
| 838 /** Collects the data of this stream in a [List]. */ | |
| 839 Future<List<T>> toList() { | |
| 840 List<T> result = <T>[]; | |
| 841 _Future<List<T>> future = new _Future<List<T>>(); | |
| 842 this.listen( | |
| 843 (T data) { | |
| 844 result.add(data); | |
| 845 }, | |
| 846 onError: future._completeError, | |
| 847 onDone: () { | |
| 848 future._complete(result); | |
| 849 }, | |
| 850 cancelOnError: true); | |
| 851 return future; | |
| 852 } | |
| 853 | |
| 854 /** | |
| 855 * Collects the data of this stream in a [Set]. | |
| 856 * | |
| 857 * The returned set is the same type as returned by `new Set<T>()`. | |
| 858 * If another type of set is needed, either use [forEach] to add each | |
| 859 * element to the set, or use | |
| 860 * `toList().then((list) => new SomeOtherSet.from(list))` | |
| 861 * to create the set. | |
| 862 */ | |
| 863 Future<Set<T>> toSet() { | |
| 864 Set<T> result = new Set<T>(); | |
| 865 _Future<Set<T>> future = new _Future<Set<T>>(); | |
| 866 this.listen( | |
| 867 (T data) { | |
| 868 result.add(data); | |
| 869 }, | |
| 870 onError: future._completeError, | |
| 871 onDone: () { | |
| 872 future._complete(result); | |
| 873 }, | |
| 874 cancelOnError: true); | |
| 875 return future; | |
| 876 } | |
| 877 | |
| 878 /** | |
| 879 * Discards all data on the stream, but signals when it's done or an error | |
| 880 * occured. | |
| 881 * | |
| 882 * When subscribing using [drain], cancelOnError will be true. This means | |
| 883 * that the future will complete with the first error on the stream and then | |
| 884 * cancel the subscription. | |
| 885 * | |
| 886 * In case of a `done` event the future completes with the given | |
| 887 * [futureValue]. | |
| 888 */ | |
| 889 Future/*<E>*/ drain/*<E>*/([/*=E*/ futureValue]) | |
| 890 => listen(null, cancelOnError: true).asFuture/*<E>*/(futureValue); | |
| 891 | |
| 892 /** | |
| 893 * Provides at most the first [n] values of this stream. | |
| 894 * | |
| 895 * Forwards the first [n] data events of this stream, and all error | |
| 896 * events, to the returned stream, and ends with a done event. | |
| 897 * | |
| 898 * If this stream produces fewer than [count] values before it's done, | |
| 899 * so will the returned stream. | |
| 900 * | |
| 901 * Stops listening to the stream after the first [n] elements have been | |
| 902 * received. | |
| 903 * | |
| 904 * Internally the method cancels its subscription after these elements. This | |
| 905 * means that single-subscription (non-broadcast) streams are closed and | |
| 906 * cannot be reused after a call to this method. | |
| 907 * | |
| 908 * The returned stream is a broadcast stream if this stream is. | |
| 909 * For a broadcast stream, the events are only counted from the time | |
| 910 * the returned stream is listened to. | |
| 911 */ | |
| 912 Stream<T> take(int count) { | |
| 913 return new _TakeStream<T>(this, count); | |
| 914 } | |
| 915 | |
| 916 /** | |
| 917 * Forwards data events while [test] is successful. | |
| 918 * | |
| 919 * The returned stream provides the same events as this stream as long | |
| 920 * as [test] returns [:true:] for the event data. The stream is done | |
| 921 * when either this stream is done, or when this stream first provides | |
| 922 * a value that [test] doesn't accept. | |
| 923 * | |
| 924 * Stops listening to the stream after the accepted elements. | |
| 925 * | |
| 926 * Internally the method cancels its subscription after these elements. This | |
| 927 * means that single-subscription (non-broadcast) streams are closed and | |
| 928 * cannot be reused after a call to this method. | |
| 929 * | |
| 930 * The returned stream is a broadcast stream if this stream is. | |
| 931 * For a broadcast stream, the events are only tested from the time | |
| 932 * the returned stream is listened to. | |
| 933 */ | |
| 934 Stream<T> takeWhile(bool test(T element)) { | |
| 935 return new _TakeWhileStream<T>(this, test); | |
| 936 } | |
| 937 | |
| 938 /** | |
| 939 * Skips the first [count] data events from this stream. | |
| 940 * | |
| 941 * The returned stream is a broadcast stream if this stream is. | |
| 942 * For a broadcast stream, the events are only counted from the time | |
| 943 * the returned stream is listened to. | |
| 944 */ | |
| 945 Stream<T> skip(int count) { | |
| 946 return new _SkipStream<T>(this, count); | |
| 947 } | |
| 948 | |
| 949 /** | |
| 950 * Skip data events from this stream while they are matched by [test]. | |
| 951 * | |
| 952 * Error and done events are provided by the returned stream unmodified. | |
| 953 * | |
| 954 * Starting with the first data event where [test] returns false for the | |
| 955 * event data, the returned stream will have the same events as this stream. | |
| 956 * | |
| 957 * The returned stream is a broadcast stream if this stream is. | |
| 958 * For a broadcast stream, the events are only tested from the time | |
| 959 * the returned stream is listened to. | |
| 960 */ | |
| 961 Stream<T> skipWhile(bool test(T element)) { | |
| 962 return new _SkipWhileStream<T>(this, test); | |
| 963 } | |
| 964 | |
| 965 /** | |
| 966 * Skips data events if they are equal to the previous data event. | |
| 967 * | |
| 968 * The returned stream provides the same events as this stream, except | |
| 969 * that it never provides two consecutive data events that are equal. | |
| 970 * | |
| 971 * Equality is determined by the provided [equals] method. If that is | |
| 972 * omitted, the '==' operator on the last provided data element is used. | |
| 973 * | |
| 974 * The returned stream is a broadcast stream if this stream is. | |
| 975 * If a broadcast stream is listened to more than once, each subscription | |
| 976 * will individually perform the `equals` test. | |
| 977 */ | |
| 978 Stream<T> distinct([bool equals(T previous, T next)]) { | |
| 979 return new _DistinctStream<T>(this, equals); | |
| 980 } | |
| 981 | |
| 982 /** | |
| 983 * Returns the first element of the stream. | |
| 984 * | |
| 985 * Stops listening to the stream after the first element has been received. | |
| 986 * | |
| 987 * Internally the method cancels its subscription after the first element. | |
| 988 * This means that single-subscription (non-broadcast) streams are closed | |
| 989 * and cannot be reused after a call to this getter. | |
| 990 * | |
| 991 * If an error event occurs before the first data event, the resulting future | |
| 992 * is completed with that error. | |
| 993 * | |
| 994 * If this stream is empty (a done event occurs before the first data event), | |
| 995 * the resulting future completes with a [StateError]. | |
| 996 * | |
| 997 * Except for the type of the error, this method is equivalent to | |
| 998 * [:this.elementAt(0):]. | |
| 999 */ | |
| 1000 Future<T> get first { | |
| 1001 _Future<T> future = new _Future<T>(); | |
| 1002 StreamSubscription subscription; | |
| 1003 subscription = this.listen( | |
| 1004 (T value) { | |
| 1005 _cancelAndValue(subscription, future, value); | |
| 1006 }, | |
| 1007 onError: future._completeError, | |
| 1008 onDone: () { | |
| 1009 try { | |
| 1010 throw IterableElementError.noElement(); | |
| 1011 } catch (e, s) { | |
| 1012 _completeWithErrorCallback(future, e, s); | |
| 1013 } | |
| 1014 }, | |
| 1015 cancelOnError: true); | |
| 1016 return future; | |
| 1017 } | |
| 1018 | |
| 1019 /** | |
| 1020 * Returns the last element of the stream. | |
| 1021 * | |
| 1022 * If an error event occurs before the first data event, the resulting future | |
| 1023 * is completed with that error. | |
| 1024 * | |
| 1025 * If this stream is empty (a done event occurs before the first data event), | |
| 1026 * the resulting future completes with a [StateError]. | |
| 1027 */ | |
| 1028 Future<T> get last { | |
| 1029 _Future<T> future = new _Future<T>(); | |
| 1030 T result = null; | |
| 1031 bool foundResult = false; | |
| 1032 listen( | |
| 1033 (T value) { | |
| 1034 foundResult = true; | |
| 1035 result = value; | |
| 1036 }, | |
| 1037 onError: future._completeError, | |
| 1038 onDone: () { | |
| 1039 if (foundResult) { | |
| 1040 future._complete(result); | |
| 1041 return; | |
| 1042 } | |
| 1043 try { | |
| 1044 throw IterableElementError.noElement(); | |
| 1045 } catch (e, s) { | |
| 1046 _completeWithErrorCallback(future, e, s); | |
| 1047 } | |
| 1048 }, | |
| 1049 cancelOnError: true); | |
| 1050 return future; | |
| 1051 } | |
| 1052 | |
| 1053 /** | |
| 1054 * Returns the single element. | |
| 1055 * | |
| 1056 * If an error event occurs before or after the first data event, the | |
| 1057 * resulting future is completed with that error. | |
| 1058 * | |
| 1059 * If [this] is empty or has more than one element throws a [StateError]. | |
| 1060 */ | |
| 1061 Future<T> get single { | |
| 1062 _Future<T> future = new _Future<T>(); | |
| 1063 T result = null; | |
| 1064 bool foundResult = false; | |
| 1065 StreamSubscription subscription; | |
| 1066 subscription = this.listen( | |
| 1067 (T value) { | |
| 1068 if (foundResult) { | |
| 1069 // This is the second element we get. | |
| 1070 try { | |
| 1071 throw IterableElementError.tooMany(); | |
| 1072 } catch (e, s) { | |
| 1073 _cancelAndErrorWithReplacement(subscription, future, e, s); | |
| 1074 } | |
| 1075 return; | |
| 1076 } | |
| 1077 foundResult = true; | |
| 1078 result = value; | |
| 1079 }, | |
| 1080 onError: future._completeError, | |
| 1081 onDone: () { | |
| 1082 if (foundResult) { | |
| 1083 future._complete(result); | |
| 1084 return; | |
| 1085 } | |
| 1086 try { | |
| 1087 throw IterableElementError.noElement(); | |
| 1088 } catch (e, s) { | |
| 1089 _completeWithErrorCallback(future, e, s); | |
| 1090 } | |
| 1091 }, | |
| 1092 cancelOnError: true); | |
| 1093 return future; | |
| 1094 } | |
| 1095 | |
| 1096 /** | |
| 1097 * Finds the first element of this stream matching [test]. | |
| 1098 * | |
| 1099 * Returns a future that is filled with the first element of this stream | |
| 1100 * that [test] returns true for. | |
| 1101 * | |
| 1102 * If no such element is found before this stream is done, and a | |
| 1103 * [defaultValue] function is provided, the result of calling [defaultValue] | |
| 1104 * becomes the value of the future. | |
| 1105 * | |
| 1106 * Stops listening to the stream after the first matching element has been | |
| 1107 * received. | |
| 1108 * | |
| 1109 * Internally the method cancels its subscription after the first element that | |
| 1110 * matches the predicate. This means that single-subscription (non-broadcast) | |
| 1111 * streams are closed and cannot be reused after a call to this method. | |
| 1112 * | |
| 1113 * If an error occurs, or if this stream ends without finding a match and | |
| 1114 * with no [defaultValue] function provided, the future will receive an | |
| 1115 * error. | |
| 1116 */ | |
| 1117 Future<dynamic> firstWhere(bool test(T element), {Object defaultValue()}) { | |
| 1118 _Future<dynamic> future = new _Future(); | |
| 1119 StreamSubscription subscription; | |
| 1120 subscription = this.listen( | |
| 1121 (T value) { | |
| 1122 _runUserCode( | |
| 1123 () => test(value), | |
| 1124 (bool isMatch) { | |
| 1125 if (isMatch) { | |
| 1126 _cancelAndValue(subscription, future, value); | |
| 1127 } | |
| 1128 }, | |
| 1129 _cancelAndErrorClosure(subscription, future) | |
| 1130 ); | |
| 1131 }, | |
| 1132 onError: future._completeError, | |
| 1133 onDone: () { | |
| 1134 if (defaultValue != null) { | |
| 1135 _runUserCode(defaultValue, future._complete, future._completeError); | |
| 1136 return; | |
| 1137 } | |
| 1138 try { | |
| 1139 throw IterableElementError.noElement(); | |
| 1140 } catch (e, s) { | |
| 1141 _completeWithErrorCallback(future, e, s); | |
| 1142 } | |
| 1143 }, | |
| 1144 cancelOnError: true); | |
| 1145 return future; | |
| 1146 } | |
| 1147 | |
| 1148 /** | |
| 1149 * Finds the last element in this stream matching [test]. | |
| 1150 * | |
| 1151 * As [firstWhere], except that the last matching element is found. | |
| 1152 * That means that the result cannot be provided before this stream | |
| 1153 * is done. | |
| 1154 */ | |
| 1155 Future<dynamic> lastWhere(bool test(T element), {Object defaultValue()}) { | |
| 1156 _Future<dynamic> future = new _Future(); | |
| 1157 T result = null; | |
| 1158 bool foundResult = false; | |
| 1159 StreamSubscription subscription; | |
| 1160 subscription = this.listen( | |
| 1161 (T value) { | |
| 1162 _runUserCode( | |
| 1163 () => true == test(value), | |
| 1164 (bool isMatch) { | |
| 1165 if (isMatch) { | |
| 1166 foundResult = true; | |
| 1167 result = value; | |
| 1168 } | |
| 1169 }, | |
| 1170 _cancelAndErrorClosure(subscription, future) | |
| 1171 ); | |
| 1172 }, | |
| 1173 onError: future._completeError, | |
| 1174 onDone: () { | |
| 1175 if (foundResult) { | |
| 1176 future._complete(result); | |
| 1177 return; | |
| 1178 } | |
| 1179 if (defaultValue != null) { | |
| 1180 _runUserCode(defaultValue, future._complete, future._completeError); | |
| 1181 return; | |
| 1182 } | |
| 1183 try { | |
| 1184 throw IterableElementError.noElement(); | |
| 1185 } catch (e, s) { | |
| 1186 _completeWithErrorCallback(future, e, s); | |
| 1187 } | |
| 1188 }, | |
| 1189 cancelOnError: true); | |
| 1190 return future; | |
| 1191 } | |
| 1192 | |
| 1193 /** | |
| 1194 * Finds the single element in this stream matching [test]. | |
| 1195 * | |
| 1196 * Like [lastMatch], except that it is an error if more than one | |
| 1197 * matching element occurs in the stream. | |
| 1198 */ | |
| 1199 Future<T> singleWhere(bool test(T element)) { | |
| 1200 _Future<T> future = new _Future<T>(); | |
| 1201 T result = null; | |
| 1202 bool foundResult = false; | |
| 1203 StreamSubscription subscription; | |
| 1204 subscription = this.listen( | |
| 1205 (T value) { | |
| 1206 _runUserCode( | |
| 1207 () => true == test(value), | |
| 1208 (bool isMatch) { | |
| 1209 if (isMatch) { | |
| 1210 if (foundResult) { | |
| 1211 try { | |
| 1212 throw IterableElementError.tooMany(); | |
| 1213 } catch (e, s) { | |
| 1214 _cancelAndErrorWithReplacement(subscription, future, e, s); | |
| 1215 } | |
| 1216 return; | |
| 1217 } | |
| 1218 foundResult = true; | |
| 1219 result = value; | |
| 1220 } | |
| 1221 }, | |
| 1222 _cancelAndErrorClosure(subscription, future) | |
| 1223 ); | |
| 1224 }, | |
| 1225 onError: future._completeError, | |
| 1226 onDone: () { | |
| 1227 if (foundResult) { | |
| 1228 future._complete(result); | |
| 1229 return; | |
| 1230 } | |
| 1231 try { | |
| 1232 throw IterableElementError.noElement(); | |
| 1233 } catch (e, s) { | |
| 1234 _completeWithErrorCallback(future, e, s); | |
| 1235 } | |
| 1236 }, | |
| 1237 cancelOnError: true); | |
| 1238 return future; | |
| 1239 } | |
| 1240 | |
| 1241 /** | |
| 1242 * Returns the value of the [index]th data event of this stream. | |
| 1243 * | |
| 1244 * Stops listening to the stream after the [index]th data event has been | |
| 1245 * received. | |
| 1246 * | |
| 1247 * Internally the method cancels its subscription after these elements. This | |
| 1248 * means that single-subscription (non-broadcast) streams are closed and | |
| 1249 * cannot be reused after a call to this method. | |
| 1250 * | |
| 1251 * If an error event occurs before the value is found, the future completes | |
| 1252 * with this error. | |
| 1253 * | |
| 1254 * If a done event occurs before the value is found, the future completes | |
| 1255 * with a [RangeError]. | |
| 1256 */ | |
| 1257 Future<T> elementAt(int index) { | |
| 1258 if (index is! int || index < 0) throw new ArgumentError(index); | |
| 1259 _Future<T> future = new _Future<T>(); | |
| 1260 StreamSubscription subscription; | |
| 1261 int elementIndex = 0; | |
| 1262 subscription = this.listen( | |
| 1263 (T value) { | |
| 1264 if (index == elementIndex) { | |
| 1265 _cancelAndValue(subscription, future, value); | |
| 1266 return; | |
| 1267 } | |
| 1268 elementIndex += 1; | |
| 1269 }, | |
| 1270 onError: future._completeError, | |
| 1271 onDone: () { | |
| 1272 future._completeError( | |
| 1273 new RangeError.index(index, this, "index", null, elementIndex)); | |
| 1274 }, | |
| 1275 cancelOnError: true); | |
| 1276 return future; | |
| 1277 } | |
| 1278 | |
| 1279 /** | |
| 1280 * Creates a new stream with the same events as this stream. | |
| 1281 * | |
| 1282 * Whenever more than [timeLimit] passes between two events from this stream, | |
| 1283 * the [onTimeout] function is called. | |
| 1284 * | |
| 1285 * The countdown doesn't start until the returned stream is listened to. | |
| 1286 * The countdown is reset every time an event is forwarded from this stream, | |
| 1287 * or when the stream is paused and resumed. | |
| 1288 * | |
| 1289 * The [onTimeout] function is called with one argument: an | |
| 1290 * [EventSink] that allows putting events into the returned stream. | |
| 1291 * This `EventSink` is only valid during the call to `onTimeout`. | |
| 1292 * | |
| 1293 * If `onTimeout` is omitted, a timeout will just put a [TimeoutException] | |
| 1294 * into the error channel of the returned stream. | |
| 1295 * | |
| 1296 * The returned stream is a broadcast stream if this stream is. | |
| 1297 * If a broadcast stream is listened to more than once, each subscription | |
| 1298 * will have its individually timer that starts counting on listen, | |
| 1299 * and the subscriptions' timers can be paused individually. | |
| 1300 */ | |
| 1301 Stream<T> timeout(Duration timeLimit, {void onTimeout(EventSink<T> sink)}) { | |
| 1302 StreamController<T> controller; | |
| 1303 // The following variables are set on listen. | |
| 1304 StreamSubscription<T> subscription; | |
| 1305 Timer timer; | |
| 1306 Zone zone; | |
| 1307 _TimerCallback timeout; | |
| 1308 | |
| 1309 void onData(T event) { | |
| 1310 timer.cancel(); | |
| 1311 controller.add(event); | |
| 1312 timer = zone.createTimer(timeLimit, timeout); | |
| 1313 } | |
| 1314 void onError(error, StackTrace stackTrace) { | |
| 1315 timer.cancel(); | |
| 1316 assert(controller is _StreamController || | |
| 1317 controller is _BroadcastStreamController); | |
| 1318 dynamic eventSink = controller; | |
| 1319 eventSink._addError(error, stackTrace); // Avoid Zone error replacement. | |
| 1320 timer = zone.createTimer(timeLimit, timeout); | |
| 1321 } | |
| 1322 void onDone() { | |
| 1323 timer.cancel(); | |
| 1324 controller.close(); | |
| 1325 } | |
| 1326 void onListen() { | |
| 1327 // This is the onListen callback for of controller. | |
| 1328 // It runs in the same zone that the subscription was created in. | |
| 1329 // Use that zone for creating timers and running the onTimeout | |
| 1330 // callback. | |
| 1331 zone = Zone.current; | |
| 1332 if (onTimeout == null) { | |
| 1333 timeout = () { | |
| 1334 controller.addError(new TimeoutException("No stream event", | |
| 1335 timeLimit), null); | |
| 1336 }; | |
| 1337 } else { | |
| 1338 // TODO(floitsch): the return type should be 'void', and the type | |
| 1339 // should be inferred. | |
| 1340 var registeredOnTimeout = | |
| 1341 zone.registerUnaryCallback/*<dynamic, EventSink<T>>*/(onTimeout); | |
| 1342 _ControllerEventSinkWrapper wrapper = | |
| 1343 new _ControllerEventSinkWrapper(null); | |
| 1344 timeout = () { | |
| 1345 wrapper._sink = controller; // Only valid during call. | |
| 1346 zone.runUnaryGuarded(registeredOnTimeout, wrapper); | |
| 1347 wrapper._sink = null; | |
| 1348 }; | |
| 1349 } | |
| 1350 | |
| 1351 subscription = this.listen(onData, onError: onError, onDone: onDone); | |
| 1352 timer = zone.createTimer(timeLimit, timeout); | |
| 1353 } | |
| 1354 Future onCancel() { | |
| 1355 timer.cancel(); | |
| 1356 Future result = subscription.cancel(); | |
| 1357 subscription = null; | |
| 1358 return result; | |
| 1359 } | |
| 1360 controller = isBroadcast | |
| 1361 ? new _SyncBroadcastStreamController<T>(onListen, onCancel) | |
| 1362 : new _SyncStreamController<T>( | |
| 1363 onListen, | |
| 1364 () { | |
| 1365 // Don't null the timer, onCancel may call cancel again. | |
| 1366 timer.cancel(); | |
| 1367 subscription.pause(); | |
| 1368 }, | |
| 1369 () { | |
| 1370 subscription.resume(); | |
| 1371 timer = zone.createTimer(timeLimit, timeout); | |
| 1372 }, | |
| 1373 onCancel); | |
| 1374 return controller.stream; | |
| 1375 } | |
| 1376 } | |
| 1377 | |
| 1378 /** | |
| 1379 * A subscription on events from a [Stream]. | |
| 1380 * | |
| 1381 * When you listen on a [Stream] using [Stream.listen], | |
| 1382 * a [StreamSubscription] object is returned. | |
| 1383 * | |
| 1384 * The subscription provides events to the listener, | |
| 1385 * and holds the callbacks used to handle the events. | |
| 1386 * The subscription can also be used to unsubscribe from the events, | |
| 1387 * or to temporarily pause the events from the stream. | |
| 1388 */ | |
| 1389 abstract class StreamSubscription<T> { | |
| 1390 /** | |
| 1391 * Cancels this subscription. | |
| 1392 * | |
| 1393 * After this call, the subscription no longer receives events. | |
| 1394 * | |
| 1395 * The stream may need to shut down the source of events and clean up after | |
| 1396 * the subscription is canceled. | |
| 1397 * | |
| 1398 * Returns a future that is completed once the stream has finished | |
| 1399 * its cleanup. May also return `null` if no cleanup was necessary. | |
| 1400 * | |
| 1401 * Typically, futures are returned when the stream needs to release resources. | |
| 1402 * For example, a stream might need to close an open file (as an asynchronous | |
| 1403 * operation). If the listener wants to delete the file after having | |
| 1404 * canceled the subscription, it must wait for the cleanup future to complete. | |
| 1405 * | |
| 1406 * A returned future completes with a `null` value. | |
| 1407 * If the cleanup throws, which it really shouldn't, the returned future | |
| 1408 * completes with that error. | |
| 1409 */ | |
| 1410 Future cancel(); | |
| 1411 | |
| 1412 /** | |
| 1413 * Set or override the data event handler of this subscription. | |
| 1414 * | |
| 1415 * This method overrides the handler that has been set at the invocation of | |
| 1416 * [Stream.listen]. | |
| 1417 */ | |
| 1418 void onData(void handleData(T data)); | |
| 1419 | |
| 1420 /** | |
| 1421 * Set or override the error event handler of this subscription. | |
| 1422 * | |
| 1423 * This method overrides the handler that has been set at the invocation of | |
| 1424 * [Stream.listen] or by calling [asFuture]. | |
| 1425 */ | |
| 1426 void onError(Function handleError); | |
| 1427 | |
| 1428 /** | |
| 1429 * Set or override the done event handler of this subscription. | |
| 1430 * | |
| 1431 * This method overrides the handler that has been set at the invocation of | |
| 1432 * [Stream.listen] or by calling [asFuture]. | |
| 1433 */ | |
| 1434 void onDone(void handleDone()); | |
| 1435 | |
| 1436 /** | |
| 1437 * Request that the stream pauses events until further notice. | |
| 1438 * | |
| 1439 * While paused, the subscription will not fire any events. | |
| 1440 * If it receives events from its source, they will be buffered until | |
| 1441 * the subscription is resumed. | |
| 1442 * The underlying source is usually informed about the pause, | |
| 1443 * so it can stop generating events until the subscription is resumed. | |
| 1444 * | |
| 1445 * To avoid buffering events on a broadcast stream, it is better to | |
| 1446 * cancel this subscription, and start to listen again when events | |
| 1447 * are needed. | |
| 1448 * | |
| 1449 * If [resumeSignal] is provided, the stream will undo the pause | |
| 1450 * when the future completes. If the future completes with an error, | |
| 1451 * the stream will resume, but the error will not be handled! | |
| 1452 * | |
| 1453 * A call to [resume] will also undo a pause. | |
| 1454 * | |
| 1455 * If the subscription is paused more than once, an equal number | |
| 1456 * of resumes must be performed to resume the stream. | |
| 1457 * | |
| 1458 * Currently DOM streams silently drop events when the stream is paused. This | |
| 1459 * is a bug and will be fixed. | |
| 1460 */ | |
| 1461 void pause([Future resumeSignal]); | |
| 1462 | |
| 1463 /** | |
| 1464 * Resume after a pause. | |
| 1465 */ | |
| 1466 void resume(); | |
| 1467 | |
| 1468 /** | |
| 1469 * Returns true if the [StreamSubscription] is paused. | |
| 1470 */ | |
| 1471 bool get isPaused; | |
| 1472 | |
| 1473 /** | |
| 1474 * Returns a future that handles the [onDone] and [onError] callbacks. | |
| 1475 * | |
| 1476 * This method *overwrites* the existing [onDone] and [onError] callbacks | |
| 1477 * with new ones that complete the returned future. | |
| 1478 * | |
| 1479 * In case of an error the subscription will automatically cancel (even | |
| 1480 * when it was listening with `cancelOnError` set to `false`). | |
| 1481 * | |
| 1482 * In case of a `done` event the future completes with the given | |
| 1483 * [futureValue]. | |
| 1484 */ | |
| 1485 Future/*<E>*/ asFuture/*<E>*/([var/*=E*/ futureValue]); | |
| 1486 } | |
| 1487 | |
| 1488 | |
| 1489 /** | |
| 1490 * An interface that abstracts creation or handling of [Stream] events. | |
| 1491 */ | |
| 1492 abstract class EventSink<T> implements Sink<T> { | |
| 1493 /** Send a data event to a stream. */ | |
| 1494 void add(T event); | |
| 1495 | |
| 1496 /** Send an async error to a stream. */ | |
| 1497 void addError(errorEvent, [StackTrace stackTrace]); | |
| 1498 | |
| 1499 /** Close the sink. No further events can be added after closing. */ | |
| 1500 void close(); | |
| 1501 } | |
| 1502 | |
| 1503 | |
| 1504 /** [Stream] wrapper that only exposes the [Stream] interface. */ | |
| 1505 class StreamView<T> extends Stream<T> { | |
| 1506 final Stream<T> _stream; | |
| 1507 | |
| 1508 const StreamView(Stream<T> stream) : _stream = stream, super._internal(); | |
| 1509 | |
| 1510 bool get isBroadcast => _stream.isBroadcast; | |
| 1511 | |
| 1512 Stream<T> asBroadcastStream( | |
| 1513 {void onListen(StreamSubscription<T> subscription), | |
| 1514 void onCancel(StreamSubscription<T> subscription)}) | |
| 1515 => _stream.asBroadcastStream(onListen: onListen, onCancel: onCancel); | |
| 1516 | |
| 1517 StreamSubscription<T> listen(void onData(T value), | |
| 1518 { Function onError, | |
| 1519 void onDone(), | |
| 1520 bool cancelOnError }) { | |
| 1521 return _stream.listen(onData, onError: onError, onDone: onDone, | |
| 1522 cancelOnError: cancelOnError); | |
| 1523 } | |
| 1524 } | |
| 1525 | |
| 1526 | |
| 1527 /** | |
| 1528 * Abstract interface for a "sink" accepting multiple entire streams. | |
| 1529 * | |
| 1530 * A consumer can accept a number of consecutive streams using [addStream], | |
| 1531 * and when no further data need to be added, the [close] method tells the | |
| 1532 * consumer to complete its work and shut down. | |
| 1533 * | |
| 1534 * This class is not just a [Sink<Stream>] because it is also combined with | |
| 1535 * other [Sink] classes, like it's combined with [EventSink] in the | |
| 1536 * [StreamSink] class. | |
| 1537 * | |
| 1538 * The [Stream.pipe] accepts a `StreamConsumer` and will pass the stream | |
| 1539 * to the consumer's [addStream] method. When that completes, it will | |
| 1540 * call [close] and then complete its own returned future. | |
| 1541 */ | |
| 1542 abstract class StreamConsumer<S> { | |
| 1543 /** | |
| 1544 * Consumes the elements of [stream]. | |
| 1545 * | |
| 1546 * Listens on [stream] and does something for each event. | |
| 1547 * | |
| 1548 * Returns a future which is completed when the stream is done being added, | |
| 1549 * and the consumer is ready to accept a new stream. | |
| 1550 * No further calls to [addStream] or [close] should happen before the | |
| 1551 * returned future has completed. | |
| 1552 * | |
| 1553 * The consumer may stop listening to the stream after an error, | |
| 1554 * it may consume all the errors and only stop at a done event, | |
| 1555 * or it may be canceled early if the receiver don't want any further events. | |
| 1556 * | |
| 1557 * If the consumer stops listening because of some error preventing it | |
| 1558 * from continuing, it may report this error in the returned future, | |
| 1559 * otherwise it will just complete the future with `null`. | |
| 1560 */ | |
| 1561 Future addStream(Stream<S> stream); | |
| 1562 | |
| 1563 /** | |
| 1564 * Tells the consumer that no further streams will be added. | |
| 1565 * | |
| 1566 * This allows the consumer to complete any remaining work and release | |
| 1567 * resources that are no longer needed | |
| 1568 * | |
| 1569 * Returns a future which is completed when the consumer has shut down. | |
| 1570 * If cleaning up can fail, the error may be reported in the returned future, | |
| 1571 * otherwise it completes with `null`. | |
| 1572 */ | |
| 1573 Future close(); | |
| 1574 } | |
| 1575 | |
| 1576 | |
| 1577 /** | |
| 1578 * A object that accepts stream events both synchronously and asynchronously. | |
| 1579 * | |
| 1580 * A [StreamSink] unifies the asynchronous methods from [StreamConsumer] and | |
| 1581 * the synchronous methods from [EventSink]. | |
| 1582 * | |
| 1583 * The [EventSink] methods can't be used while the [addStream] is called. | |
| 1584 * As soon as the [addStream]'s [Future] completes with a value, the | |
| 1585 * [EventSink] methods can be used again. | |
| 1586 * | |
| 1587 * If [addStream] is called after any of the [EventSink] methods, it'll | |
| 1588 * be delayed until the underlying system has consumed the data added by the | |
| 1589 * [EventSink] methods. | |
| 1590 * | |
| 1591 * When [EventSink] methods are used, the [done] [Future] can be used to | |
| 1592 * catch any errors. | |
| 1593 * | |
| 1594 * When [close] is called, it will return the [done] [Future]. | |
| 1595 */ | |
| 1596 abstract class StreamSink<S> implements EventSink<S>, StreamConsumer<S> { | |
| 1597 /** | |
| 1598 * Tells the stream sink that no further streams will be added. | |
| 1599 * | |
| 1600 * This allows the stream sink to complete any remaining work and release | |
| 1601 * resources that are no longer needed | |
| 1602 * | |
| 1603 * Returns a future which is completed when the stream sink has shut down. | |
| 1604 * If cleaning up can fail, the error may be reported in the returned future, | |
| 1605 * otherwise it completes with `null`. | |
| 1606 * | |
| 1607 * Returns the same future as [done]. | |
| 1608 * | |
| 1609 * The stream sink may close before the [close] method is called, either due | |
| 1610 * to an error or because it is itself providing events to someone who has | |
| 1611 * stopped listening. In that case, the [done] future is completed first, | |
| 1612 * and the `close` method will return the `done` future when called. | |
| 1613 * | |
| 1614 * Unifies [StreamConsumer.close] and [EventSink.close] which both mark their | |
| 1615 * object as not expecting any further events. | |
| 1616 */ | |
| 1617 Future close(); | |
| 1618 | |
| 1619 /** | |
| 1620 * Return a future which is completed when the [StreamSink] is finished. | |
| 1621 * | |
| 1622 * If the `StreamSink` fails with an error, | |
| 1623 * perhaps in response to adding events using [add], [addError] or [close], | |
| 1624 * the [done] future will complete with that error. | |
| 1625 * | |
| 1626 * Otherwise, the returned future will complete when either: | |
| 1627 * | |
| 1628 * * all events have been processed and the sink has been closed, or | |
| 1629 * * the sink has otherwise been stopped from handling more events | |
| 1630 * (for example by cancelling a stream subscription). | |
| 1631 */ | |
| 1632 Future get done; | |
| 1633 } | |
| 1634 | |
| 1635 | |
| 1636 /** | |
| 1637 * The target of a [Stream.transform] call. | |
| 1638 * | |
| 1639 * The [Stream.transform] call will pass itself to this object and then return | |
| 1640 * the resulting stream. | |
| 1641 * | |
| 1642 * It is good practice to write transformers that can be used multiple times. | |
| 1643 */ | |
| 1644 abstract class StreamTransformer<S, T> { | |
| 1645 /** | |
| 1646 * Creates a [StreamTransformer]. | |
| 1647 * | |
| 1648 * The returned instance takes responsibility of implementing ([bind]). | |
| 1649 * When the user invokes `bind` it returns a new "bound" stream. Only when | |
| 1650 * the user starts listening to the bound stream, the `listen` method | |
| 1651 * invokes the given closure [transformer]. | |
| 1652 * | |
| 1653 * The [transformer] closure receives the stream, that was bound, as argument | |
| 1654 * and returns a [StreamSubscription]. In almost all cases the closure | |
| 1655 * listens itself to the stream that is given as argument. | |
| 1656 * | |
| 1657 * The result of invoking the [transformer] closure is a [StreamSubscription]. | |
| 1658 * The bound stream-transformer (created by the `bind` method above) then sets | |
| 1659 * the handlers it received as part of the `listen` call. | |
| 1660 * | |
| 1661 * Conceptually this can be summarized as follows: | |
| 1662 * | |
| 1663 * 1. `var transformer = new StreamTransformer(transformerClosure);` | |
| 1664 * creates a `StreamTransformer` that supports the `bind` method. | |
| 1665 * 2. `var boundStream = stream.transform(transformer);` binds the `stream` | |
| 1666 * and returns a bound stream that has a pointer to `stream`. | |
| 1667 * 3. `boundStream.listen(f1, onError: f2, onDone: f3, cancelOnError: b)` | |
| 1668 * starts the listening and transformation. This is accomplished | |
| 1669 * in 2 steps: first the `boundStream` invokes the `transformerClosure` with | |
| 1670 * the `stream` it captured: `transformerClosure(stream, b)`. | |
| 1671 * The result `subscription`, a [StreamSubscription], is then | |
| 1672 * updated to receive its handlers: `subscription.onData(f1)`, | |
| 1673 * `subscription.onError(f2)`, `subscription(f3)`. Finally the subscription | |
| 1674 * is returned as result of the `listen` call. | |
| 1675 * | |
| 1676 * There are two common ways to create a StreamSubscription: | |
| 1677 * | |
| 1678 * 1. by creating a new class that implements [StreamSubscription]. | |
| 1679 * Note that the subscription should run callbacks in the [Zone] the | |
| 1680 * stream was listened to. | |
| 1681 * 2. by allocating a [StreamController] and to return the result of | |
| 1682 * listening to its stream. | |
| 1683 * | |
| 1684 * Example use of a duplicating transformer: | |
| 1685 * | |
| 1686 * stringStream.transform(new StreamTransformer<String, String>( | |
| 1687 * (Stream<String> input, bool cancelOnError) { | |
| 1688 * StreamController<String> controller; | |
| 1689 * StreamSubscription<String> subscription; | |
| 1690 * controller = new StreamController<String>( | |
| 1691 * onListen: () { | |
| 1692 * subscription = input.listen((data) { | |
| 1693 * // Duplicate the data. | |
| 1694 * controller.add(data); | |
| 1695 * controller.add(data); | |
| 1696 * }, | |
| 1697 * onError: controller.addError, | |
| 1698 * onDone: controller.close, | |
| 1699 * cancelOnError: cancelOnError); | |
| 1700 * }, | |
| 1701 * onPause: () { subscription.pause(); }, | |
| 1702 * onResume: () { subscription.resume(); }, | |
| 1703 * onCancel: () { subscription.cancel(); }, | |
| 1704 * sync: true); | |
| 1705 * return controller.stream.listen(null); | |
| 1706 * }); | |
| 1707 */ | |
| 1708 const factory StreamTransformer( | |
| 1709 StreamSubscription<T> transformer(Stream<S> stream, bool cancelOnError)) | |
| 1710 = _StreamSubscriptionTransformer<S, T>; | |
| 1711 | |
| 1712 /** | |
| 1713 * Creates a [StreamTransformer] that delegates events to the given functions. | |
| 1714 * | |
| 1715 * Example use of a duplicating transformer: | |
| 1716 * | |
| 1717 * stringStream.transform(new StreamTransformer<String, String>.fromHandle
rs( | |
| 1718 * handleData: (String value, EventSink<String> sink) { | |
| 1719 * sink.add(value); | |
| 1720 * sink.add(value); // Duplicate the incoming events. | |
| 1721 * })); | |
| 1722 */ | |
| 1723 factory StreamTransformer.fromHandlers({ | |
| 1724 void handleData(S data, EventSink<T> sink), | |
| 1725 void handleError(Object error, StackTrace stackTrace, EventSink<T> sink), | |
| 1726 void handleDone(EventSink<T> sink)}) | |
| 1727 = _StreamHandlerTransformer<S, T>; | |
| 1728 | |
| 1729 /** | |
| 1730 * Transform the incoming [stream]'s events. | |
| 1731 * | |
| 1732 * Creates a new stream. | |
| 1733 * When this stream is listened to, it will start listening on [stream], | |
| 1734 * and generate events on the new stream based on the events from [stream]. | |
| 1735 * | |
| 1736 * Subscriptions on the returned stream should propagate pause state | |
| 1737 * to the subscription on [stream]. | |
| 1738 */ | |
| 1739 Stream<T> bind(Stream<S> stream); | |
| 1740 } | |
| 1741 | |
| 1742 /** | |
| 1743 * An [Iterator] like interface for the values of a [Stream]. | |
| 1744 * | |
| 1745 * This wraps a [Stream] and a subscription on the stream. It listens | |
| 1746 * on the stream, and completes the future returned by [moveNext] when the | |
| 1747 * next value becomes available. | |
| 1748 */ | |
| 1749 abstract class StreamIterator<T> { | |
| 1750 | |
| 1751 /** Create a [StreamIterator] on [stream]. */ | |
| 1752 factory StreamIterator(Stream<T> stream) | |
| 1753 // TODO(lrn): use redirecting factory constructor when type | |
| 1754 // arguments are supported. | |
| 1755 => new _StreamIteratorImpl<T>(stream); | |
| 1756 | |
| 1757 /** | |
| 1758 * Wait for the next stream value to be available. | |
| 1759 * | |
| 1760 * Returns a future which will complete with either `true` or `false`. | |
| 1761 * Completing with `true` means that another event has been received and | |
| 1762 * can be read as [current]. | |
| 1763 * Completing with `false` means that the stream itearation is done and | |
| 1764 * no further events will ever be available. | |
| 1765 * The future may complete with an error, if the stream produces an error, | |
| 1766 * which also ends iteration. | |
| 1767 * | |
| 1768 * The function must not be called again until the future returned by a | |
| 1769 * previous call is completed. | |
| 1770 */ | |
| 1771 Future<bool> moveNext(); | |
| 1772 | |
| 1773 /** | |
| 1774 * The current value of the stream. | |
| 1775 * | |
| 1776 * Is `null` before the first call to [moveNext] and after a call to | |
| 1777 * `moveNext` completes with a `false` result or an error. | |
| 1778 * | |
| 1779 * When a `moveNext` call completes with `true`, the `current` field holds | |
| 1780 * the most recent event of the stream, and it stays like that until the next | |
| 1781 * call to `moveNext`. | |
| 1782 * Between a call to `moveNext` and when its returned future completes, | |
| 1783 * the value is unspecified. | |
| 1784 */ | |
| 1785 T get current; | |
| 1786 | |
| 1787 /** | |
| 1788 * Cancels the stream iterator (and the underlying stream subscription) early. | |
| 1789 * | |
| 1790 * The stream iterator is automatically canceled if the [moveNext] future | |
| 1791 * completes with either `false` or an error. | |
| 1792 * | |
| 1793 * If you need to stop listening for values before the stream iterator is | |
| 1794 * automatically closed, you must call [cancel] to ensure that the stream | |
| 1795 * is properly closed. | |
| 1796 * | |
| 1797 * If [moveNext] has been called when the iterator is cancelled, | |
| 1798 * its returned future will complete with `false` as value, | |
| 1799 * as will all further calls to [moveNext]. | |
| 1800 * | |
| 1801 * Returns a future if the cancel-operation is not completed synchronously. | |
| 1802 * Otherwise returns `null`. | |
| 1803 */ | |
| 1804 Future cancel(); | |
| 1805 } | |
| 1806 | |
| 1807 | |
| 1808 /** | |
| 1809 * Wraps an [_EventSink] so it exposes only the [EventSink] interface. | |
| 1810 */ | |
| 1811 class _ControllerEventSinkWrapper<T> implements EventSink<T> { | |
| 1812 EventSink _sink; | |
| 1813 _ControllerEventSinkWrapper(this._sink); | |
| 1814 | |
| 1815 void add(T data) { _sink.add(data); } | |
| 1816 void addError(error, [StackTrace stackTrace]) { | |
| 1817 _sink.addError(error, stackTrace); | |
| 1818 } | |
| 1819 void close() { _sink.close(); } | |
| 1820 } | |
| OLD | NEW |