OLD | NEW |
| (Empty) |
1 // Copyright (c) 2013, the Dart project authors. Please see the AUTHORS file | |
2 // for details. All rights reserved. Use of this source code is governed by a | |
3 // BSD-style license that can be found in the LICENSE file. | |
4 | |
5 part of dart.async; | |
6 | |
7 // ------------------------------------------------------------------- | |
8 // Core Stream types | |
9 // ------------------------------------------------------------------- | |
10 | |
11 typedef void _TimerCallback(); | |
12 | |
13 /** | |
14 * A source of asynchronous data events. | |
15 * | |
16 * A Stream provides a way to receive a sequence of events. | |
17 * Each event is either a data event or an error event, | |
18 * representing the result of a single computation. | |
19 * When the events provided by a Stream have all been sent, | |
20 * a single "done" event will mark the end. | |
21 * | |
22 * You can [listen] on a stream to make it start generating events, | |
23 * and to set up listeners that receive the events. | |
24 * When you listen, you receive a [StreamSubscription] object | |
25 * which is the active object providing the events, | |
26 * and which can be used to stop listening again, | |
27 * or to temporarily pause events from the subscription. | |
28 * | |
29 * There are two kinds of streams: "Single-subscription" streams and | |
30 * "broadcast" streams. | |
31 * | |
32 * *A single-subscription stream* allows only a single listener during the whole | |
33 * lifetime of the stream. | |
34 * It doesn't start generating events until it has a listener, | |
35 * and it stops sending events when the listener is unsubscribed, | |
36 * even if the source of events could still provide more. | |
37 * | |
38 * Listening twice on a single-subscription stream is not allowed, even after | |
39 * the first subscription has been canceled. | |
40 * | |
41 * Single-subscription streams are generally used for streaming chunks of | |
42 * larger contiguous data like file I/O. | |
43 * | |
44 * *A broadcast stream* allows any number of listeners, and it fires | |
45 * its events when they are ready, whether there are listeners or not. | |
46 * | |
47 * Broadcast streams are used for independent events/observers. | |
48 * | |
49 * If several listeners want to listen to a single subscription stream, | |
50 * use [asBroadcastStream] to create a broadcast stream on top of the | |
51 * non-broadcast stream. | |
52 * | |
53 * On either kind of stream, stream transformations, such as [where] and | |
54 * [skip], return the same type of stream as the one the method was called on, | |
55 * unless otherwise noted. | |
56 * | |
57 * When an event is fired, the listener(s) at that time will receive the event. | |
58 * If a listener is added to a broadcast stream while an event is being fired, | |
59 * that listener will not receive the event currently being fired. | |
60 * If a listener is canceled, it immediately stops receiving events. | |
61 * | |
62 * When the "done" event is fired, subscribers are unsubscribed before | |
63 * receiving the event. After the event has been sent, the stream has no | |
64 * subscribers. Adding new subscribers to a broadcast stream after this point | |
65 * is allowed, but they will just receive a new "done" event as soon | |
66 * as possible. | |
67 * | |
68 * Stream subscriptions always respect "pause" requests. If necessary they need | |
69 * to buffer their input, but often, and preferably, they can simply request | |
70 * their input to pause too. | |
71 * | |
72 * The default implementation of [isBroadcast] returns false. | |
73 * A broadcast stream inheriting from [Stream] must override [isBroadcast] | |
74 * to return `true`. | |
75 */ | |
76 abstract class Stream<T> { | |
77 Stream(); | |
78 | |
79 /** | |
80 * Internal use only. We do not want to promise that Stream stays const. | |
81 * | |
82 * If mixins become compatible with const constructors, we may use a | |
83 * stream mixin instead of extending Stream from a const class. | |
84 */ | |
85 const Stream._internal(); | |
86 | |
87 /** | |
88 * Creates an empty broadcast stream. | |
89 * | |
90 * This is a stream which does nothing except sending a done event | |
91 * when it's listened to. | |
92 */ | |
93 const factory Stream.empty() = _EmptyStream<T>; | |
94 | |
95 /** | |
96 * Creates a new single-subscription stream from the future. | |
97 * | |
98 * When the future completes, the stream will fire one event, either | |
99 * data or error, and then close with a done-event. | |
100 */ | |
101 factory Stream.fromFuture(Future<T> future) { | |
102 // Use the controller's buffering to fill in the value even before | |
103 // the stream has a listener. For a single value, it's not worth it | |
104 // to wait for a listener before doing the `then` on the future. | |
105 _StreamController<T> controller = new StreamController<T>(sync: true); | |
106 future.then((value) { | |
107 controller._add(value); | |
108 controller._closeUnchecked(); | |
109 }, | |
110 onError: (error, stackTrace) { | |
111 controller._addError(error, stackTrace); | |
112 controller._closeUnchecked(); | |
113 }); | |
114 return controller.stream; | |
115 } | |
116 | |
117 /** | |
118 * Create a stream from a group of futures. | |
119 * | |
120 * The stream reports the results of the futures on the stream in the order | |
121 * in which the futures complete. | |
122 * | |
123 * If some futures have completed before calling `Stream.fromFutures`, | |
124 * their result will be output on the created stream in some unspecified | |
125 * order. | |
126 * | |
127 * When all futures have completed, the stream is closed. | |
128 * | |
129 * If no future is passed, the stream closes as soon as possible. | |
130 */ | |
131 factory Stream.fromFutures(Iterable<Future<T>> futures) { | |
132 _StreamController<T> controller = new StreamController<T>(sync: true); | |
133 int count = 0; | |
134 var onValue = (T value) { | |
135 if (!controller.isClosed) { | |
136 controller._add(value); | |
137 if (--count == 0) controller._closeUnchecked(); | |
138 } | |
139 }; | |
140 var onError = (error, stack) { | |
141 if (!controller.isClosed) { | |
142 controller._addError(error, stack); | |
143 if (--count == 0) controller._closeUnchecked(); | |
144 } | |
145 }; | |
146 // The futures are already running, so start listening to them immediately | |
147 // (instead of waiting for the stream to be listened on). | |
148 // If we wait, we might not catch errors in the futures in time. | |
149 for (var future in futures) { | |
150 count++; | |
151 future.then(onValue, onError: onError); | |
152 } | |
153 // Use schedule microtask since controller is sync. | |
154 if (count == 0) scheduleMicrotask(controller.close); | |
155 return controller.stream; | |
156 } | |
157 | |
158 /** | |
159 * Creates a single-subscription stream that gets its data from [data]. | |
160 * | |
161 * The iterable is iterated when the stream receives a listener, and stops | |
162 * iterating if the listener cancels the subscription. | |
163 * | |
164 * If iterating [data] throws an error, the stream ends immediately with | |
165 * that error. No done event will be sent (iteration is not complete), but no | |
166 * further data events will be generated either, since iteration cannot | |
167 * continue. | |
168 */ | |
169 factory Stream.fromIterable(Iterable<T> data) { | |
170 return new _GeneratedStreamImpl<T>( | |
171 () => new _IterablePendingEvents<T>(data)); | |
172 } | |
173 | |
174 /** | |
175 * Creates a stream that repeatedly emits events at [period] intervals. | |
176 * | |
177 * The event values are computed by invoking [computation]. The argument to | |
178 * this callback is an integer that starts with 0 and is incremented for | |
179 * every event. | |
180 * | |
181 * If [computation] is omitted the event values will all be `null`. | |
182 */ | |
183 factory Stream.periodic(Duration period, | |
184 [T computation(int computationCount)]) { | |
185 Timer timer; | |
186 int computationCount = 0; | |
187 StreamController<T> controller; | |
188 // Counts the time that the Stream was running (and not paused). | |
189 Stopwatch watch = new Stopwatch(); | |
190 | |
191 void sendEvent() { | |
192 watch.reset(); | |
193 T data; | |
194 if (computation != null) { | |
195 try { | |
196 data = computation(computationCount++); | |
197 } catch (e, s) { | |
198 controller.addError(e, s); | |
199 return; | |
200 } | |
201 } | |
202 controller.add(data); | |
203 } | |
204 | |
205 void startPeriodicTimer() { | |
206 assert(timer == null); | |
207 timer = new Timer.periodic(period, (Timer timer) { | |
208 sendEvent(); | |
209 }); | |
210 } | |
211 | |
212 controller = new StreamController<T>(sync: true, | |
213 onListen: () { | |
214 watch.start(); | |
215 startPeriodicTimer(); | |
216 }, | |
217 onPause: () { | |
218 timer.cancel(); | |
219 timer = null; | |
220 watch.stop(); | |
221 }, | |
222 onResume: () { | |
223 assert(timer == null); | |
224 Duration elapsed = watch.elapsed; | |
225 watch.start(); | |
226 timer = new Timer(period - elapsed, () { | |
227 timer = null; | |
228 startPeriodicTimer(); | |
229 sendEvent(); | |
230 }); | |
231 }, | |
232 onCancel: () { | |
233 if (timer != null) timer.cancel(); | |
234 timer = null; | |
235 }); | |
236 return controller.stream; | |
237 } | |
238 | |
239 /** | |
240 * Creates a stream where all events of an existing stream are piped through | |
241 * a sink-transformation. | |
242 * | |
243 * The given [mapSink] closure is invoked when the returned stream is | |
244 * listened to. All events from the [source] are added into the event sink | |
245 * that is returned from the invocation. The transformation puts all | |
246 * transformed events into the sink the [mapSink] closure received during | |
247 * its invocation. Conceptually the [mapSink] creates a transformation pipe | |
248 * with the input sink being the returned [EventSink] and the output sink | |
249 * being the sink it received. | |
250 * | |
251 * This constructor is frequently used to build transformers. | |
252 * | |
253 * Example use for a duplicating transformer: | |
254 * | |
255 * class DuplicationSink implements EventSink<String> { | |
256 * final EventSink<String> _outputSink; | |
257 * DuplicationSink(this._outputSink); | |
258 * | |
259 * void add(String data) { | |
260 * _outputSink.add(data); | |
261 * _outputSink.add(data); | |
262 * } | |
263 * | |
264 * void addError(e, [st]) { _outputSink.addError(e, st); } | |
265 * void close() { _outputSink.close(); } | |
266 * } | |
267 * | |
268 * class DuplicationTransformer implements StreamTransformer<String, Strin
g> { | |
269 * // Some generic types ommitted for brevety. | |
270 * Stream bind(Stream stream) => new Stream<String>.eventTransformed( | |
271 * stream, | |
272 * (EventSink sink) => new DuplicationSink(sink)); | |
273 * } | |
274 * | |
275 * stringStream.transform(new DuplicationTransformer()); | |
276 * | |
277 * The resulting stream is a broadcast stream if [source] is. | |
278 */ | |
279 factory Stream.eventTransformed(Stream source, | |
280 EventSink mapSink(EventSink<T> sink)) { | |
281 return new _BoundSinkStream(source, mapSink); | |
282 } | |
283 | |
284 /** | |
285 * Reports whether this stream is a broadcast stream. | |
286 */ | |
287 bool get isBroadcast => false; | |
288 | |
289 /** | |
290 * Returns a multi-subscription stream that produces the same events as this. | |
291 * | |
292 * The returned stream will subscribe to this stream when its first | |
293 * subscriber is added, and will stay subscribed until this stream ends, | |
294 * or a callback cancels the subscription. | |
295 * | |
296 * If [onListen] is provided, it is called with a subscription-like object | |
297 * that represents the underlying subscription to this stream. It is | |
298 * possible to pause, resume or cancel the subscription during the call | |
299 * to [onListen]. It is not possible to change the event handlers, including | |
300 * using [StreamSubscription.asFuture]. | |
301 * | |
302 * If [onCancel] is provided, it is called in a similar way to [onListen] | |
303 * when the returned stream stops having listener. If it later gets | |
304 * a new listener, the [onListen] function is called again. | |
305 * | |
306 * Use the callbacks, for example, for pausing the underlying subscription | |
307 * while having no subscribers to prevent losing events, or canceling the | |
308 * subscription when there are no listeners. | |
309 */ | |
310 Stream<T> asBroadcastStream({ | |
311 void onListen(StreamSubscription<T> subscription), | |
312 void onCancel(StreamSubscription<T> subscription) }) { | |
313 return new _AsBroadcastStream<T>(this, onListen, onCancel); | |
314 } | |
315 | |
316 /** | |
317 * Adds a subscription to this stream. | |
318 * | |
319 * On each data event from this stream, the subscriber's [onData] handler | |
320 * is called. If [onData] is null, nothing happens. | |
321 * | |
322 * On errors from this stream, the [onError] handler is given a | |
323 * object describing the error. | |
324 * | |
325 * The [onError] callback must be of type `void onError(error)` or | |
326 * `void onError(error, StackTrace stackTrace)`. If [onError] accepts | |
327 * two arguments it is called with the stack trace (which could be `null` if | |
328 * the stream itself received an error without stack trace). | |
329 * Otherwise it is called with just the error object. | |
330 * If [onError] is omitted, any errors on the stream are considered unhandled, | |
331 * and will be passed to the current [Zone]'s error handler. | |
332 * By default unhandled async errors are treated | |
333 * as if they were uncaught top-level errors. | |
334 * | |
335 * If this stream closes, the [onDone] handler is called. | |
336 * | |
337 * If [cancelOnError] is true, the subscription is ended when | |
338 * the first error is reported. The default is false. | |
339 */ | |
340 StreamSubscription<T> listen(void onData(T event), | |
341 { Function onError, | |
342 void onDone(), | |
343 bool cancelOnError}); | |
344 | |
345 /** | |
346 * Creates a new stream from this stream that discards some data events. | |
347 * | |
348 * The new stream sends the same error and done events as this stream, | |
349 * but it only sends the data events that satisfy the [test]. | |
350 * | |
351 * The returned stream is a broadcast stream if this stream is. | |
352 * If a broadcast stream is listened to more than once, each subscription | |
353 * will individually perform the `test`. | |
354 */ | |
355 Stream<T> where(bool test(T event)) { | |
356 return new _WhereStream<T>(this, test); | |
357 } | |
358 | |
359 /** | |
360 * Creates a new stream that converts each element of this stream | |
361 * to a new value using the [convert] function. | |
362 * | |
363 * For each data event, `o`, in this stream, the returned stream | |
364 * provides a data event with the value `convert(o)`. | |
365 * If [convert] throws, the returned stream reports the exception as an error | |
366 * event instead. | |
367 * | |
368 * Error and done events are passed through unchanged to the returned stream. | |
369 * | |
370 * The returned stream is a broadcast stream if this stream is. | |
371 * The [convert] function is called once per data event per listener. | |
372 * If a broadcast stream is listened to more than once, each subscription | |
373 * will individually call [convert] on each data event. | |
374 */ | |
375 Stream/*<S>*/ map/*<S>*/(/*=S*/ convert(T event)) { | |
376 return new _MapStream<T, dynamic/*=S*/>(this, convert); | |
377 } | |
378 | |
379 /** | |
380 * Creates a new stream with each data event of this stream asynchronously | |
381 * mapped to a new event. | |
382 * | |
383 * This acts like [map], except that [convert] may return a [Future], | |
384 * and in that case, the stream waits for that future to complete before | |
385 * continuing with its result. | |
386 * | |
387 * The returned stream is a broadcast stream if this stream is. | |
388 */ | |
389 Stream/*<E>*/ asyncMap/*<E>*/(convert(T event)) { | |
390 StreamController/*<E>*/ controller; | |
391 StreamSubscription/*<T>*/ subscription; | |
392 | |
393 void onListen() { | |
394 final add = controller.add; | |
395 assert(controller is _StreamController || | |
396 controller is _BroadcastStreamController); | |
397 final _EventSink/*<E>*/ eventSink = | |
398 controller as Object /*=_EventSink<E>*/; | |
399 final addError = eventSink._addError; | |
400 subscription = this.listen( | |
401 (T event) { | |
402 dynamic newValue; | |
403 try { | |
404 newValue = convert(event); | |
405 } catch (e, s) { | |
406 controller.addError(e, s); | |
407 return; | |
408 } | |
409 if (newValue is Future) { | |
410 subscription.pause(); | |
411 newValue.then(add, onError: addError) | |
412 .whenComplete(subscription.resume); | |
413 } else { | |
414 controller.add(newValue as Object/*=E*/); | |
415 } | |
416 }, | |
417 onError: addError, | |
418 onDone: controller.close | |
419 ); | |
420 } | |
421 | |
422 if (this.isBroadcast) { | |
423 controller = new StreamController/*<E>*/.broadcast( | |
424 onListen: onListen, | |
425 onCancel: () { subscription.cancel(); }, | |
426 sync: true | |
427 ); | |
428 } else { | |
429 controller = new StreamController/*<E>*/( | |
430 onListen: onListen, | |
431 onPause: () { subscription.pause(); }, | |
432 onResume: () { subscription.resume(); }, | |
433 onCancel: () { subscription.cancel(); }, | |
434 sync: true | |
435 ); | |
436 } | |
437 return controller.stream; | |
438 } | |
439 | |
440 /** | |
441 * Creates a new stream with the events of a stream per original event. | |
442 * | |
443 * This acts like [expand], except that [convert] returns a [Stream] | |
444 * instead of an [Iterable]. | |
445 * The events of the returned stream becomes the events of the returned | |
446 * stream, in the order they are produced. | |
447 * | |
448 * If [convert] returns `null`, no value is put on the output stream, | |
449 * just as if it returned an empty stream. | |
450 * | |
451 * The returned stream is a broadcast stream if this stream is. | |
452 */ | |
453 Stream/*<E>*/ asyncExpand/*<E>*/(Stream/*<E>*/ convert(T event)) { | |
454 StreamController/*<E>*/ controller; | |
455 StreamSubscription<T> subscription; | |
456 void onListen() { | |
457 assert(controller is _StreamController || | |
458 controller is _BroadcastStreamController); | |
459 final _EventSink/*<E>*/ eventSink = | |
460 controller as Object /*=_EventSink<E>*/; | |
461 subscription = this.listen( | |
462 (T event) { | |
463 Stream/*<E>*/ newStream; | |
464 try { | |
465 newStream = convert(event); | |
466 } catch (e, s) { | |
467 controller.addError(e, s); | |
468 return; | |
469 } | |
470 if (newStream != null) { | |
471 subscription.pause(); | |
472 controller.addStream(newStream) | |
473 .whenComplete(subscription.resume); | |
474 } | |
475 }, | |
476 onError: eventSink._addError, // Avoid Zone error replacement. | |
477 onDone: controller.close | |
478 ); | |
479 } | |
480 if (this.isBroadcast) { | |
481 controller = new StreamController/*<E>*/.broadcast( | |
482 onListen: onListen, | |
483 onCancel: () { subscription.cancel(); }, | |
484 sync: true | |
485 ); | |
486 } else { | |
487 controller = new StreamController/*<E>*/( | |
488 onListen: onListen, | |
489 onPause: () { subscription.pause(); }, | |
490 onResume: () { subscription.resume(); }, | |
491 onCancel: () { subscription.cancel(); }, | |
492 sync: true | |
493 ); | |
494 } | |
495 return controller.stream; | |
496 } | |
497 | |
498 /** | |
499 * Creates a wrapper Stream that intercepts some errors from this stream. | |
500 * | |
501 * If this stream sends an error that matches [test], then it is intercepted | |
502 * by the [handle] function. | |
503 * | |
504 * The [onError] callback must be of type `void onError(error)` or | |
505 * `void onError(error, StackTrace stackTrace)`. Depending on the function | |
506 * type the stream either invokes [onError] with or without a stack | |
507 * trace. The stack trace argument might be `null` if the stream itself | |
508 * received an error without stack trace. | |
509 * | |
510 * An asynchronous error [:e:] is matched by a test function if [:test(e):] | |
511 * returns true. If [test] is omitted, every error is considered matching. | |
512 * | |
513 * If the error is intercepted, the [handle] function can decide what to do | |
514 * with it. It can throw if it wants to raise a new (or the same) error, | |
515 * or simply return to make the stream forget the error. | |
516 * | |
517 * If you need to transform an error into a data event, use the more generic | |
518 * [Stream.transform] to handle the event by writing a data event to | |
519 * the output sink. | |
520 * | |
521 * The returned stream is a broadcast stream if this stream is. | |
522 * If a broadcast stream is listened to more than once, each subscription | |
523 * will individually perform the `test` and handle the error. | |
524 */ | |
525 Stream<T> handleError(Function onError, { bool test(error) }) { | |
526 return new _HandleErrorStream<T>(this, onError, test); | |
527 } | |
528 | |
529 /** | |
530 * Creates a new stream from this stream that converts each element | |
531 * into zero or more events. | |
532 * | |
533 * Each incoming event is converted to an [Iterable] of new events, | |
534 * and each of these new events are then sent by the returned stream | |
535 * in order. | |
536 * | |
537 * The returned stream is a broadcast stream if this stream is. | |
538 * If a broadcast stream is listened to more than once, each subscription | |
539 * will individually call `convert` and expand the events. | |
540 */ | |
541 Stream/*<S>*/ expand/*<S>*/(Iterable/*<S>*/ convert(T value)) { | |
542 return new _ExpandStream<T, dynamic/*=S*/>(this, convert); | |
543 } | |
544 | |
545 /** | |
546 * Pipe the events of this stream into [streamConsumer]. | |
547 * | |
548 * The events of this stream are added to `streamConsumer` using | |
549 * [StreamConsumer.addStream]. | |
550 * The `streamConsumer` is closed when this stream has been successfully added | |
551 * to it - when the future returned by `addStream` completes without an error. | |
552 * | |
553 * Returns a future which completes when the stream has been consumed | |
554 * and the consumer has been closed. | |
555 * | |
556 * The returned future completes with the same result as the future returned | |
557 * by [StreamConsumer.close]. | |
558 * If the adding of the stream itself fails in some way, | |
559 * then the consumer is expected to be closed, and won't be closed again. | |
560 * In that case the returned future completes with the error from calling | |
561 * `addStream`. | |
562 */ | |
563 Future pipe(StreamConsumer<T> streamConsumer) { | |
564 return streamConsumer.addStream(this).then((_) => streamConsumer.close()); | |
565 } | |
566 | |
567 /** | |
568 * Chains this stream as the input of the provided [StreamTransformer]. | |
569 * | |
570 * Returns the result of [:streamTransformer.bind:] itself. | |
571 * | |
572 * The `streamTransformer` can decide whether it wants to return a | |
573 * broadcast stream or not. | |
574 */ | |
575 Stream/*<S>*/ transform/*<S>*/( | |
576 StreamTransformer<T, dynamic/*=S*/ > streamTransformer) { | |
577 return streamTransformer.bind(this); | |
578 } | |
579 | |
580 /** | |
581 * Reduces a sequence of values by repeatedly applying [combine]. | |
582 */ | |
583 Future<T> reduce(T combine(T previous, T element)) { | |
584 _Future<T> result = new _Future<T>(); | |
585 bool seenFirst = false; | |
586 T value; | |
587 StreamSubscription subscription; | |
588 subscription = this.listen( | |
589 (T element) { | |
590 if (seenFirst) { | |
591 _runUserCode(() => combine(value, element), | |
592 (T newValue) { value = newValue; }, | |
593 _cancelAndErrorClosure(subscription, result)); | |
594 } else { | |
595 value = element; | |
596 seenFirst = true; | |
597 } | |
598 }, | |
599 onError: result._completeError, | |
600 onDone: () { | |
601 if (!seenFirst) { | |
602 try { | |
603 throw IterableElementError.noElement(); | |
604 } catch (e, s) { | |
605 _completeWithErrorCallback(result, e, s); | |
606 } | |
607 } else { | |
608 result._complete(value); | |
609 } | |
610 }, | |
611 cancelOnError: true | |
612 ); | |
613 return result; | |
614 } | |
615 | |
616 /** Reduces a sequence of values by repeatedly applying [combine]. */ | |
617 Future/*<S>*/ fold/*<S>*/(var/*=S*/ initialValue, | |
618 /*=S*/ combine(var/*=S*/ previous, T element)) { | |
619 | |
620 _Future/*<S>*/ result = new _Future/*<S>*/(); | |
621 var/*=S*/ value = initialValue; | |
622 StreamSubscription subscription; | |
623 subscription = this.listen( | |
624 (T element) { | |
625 _runUserCode( | |
626 () => combine(value, element), | |
627 (/*=S*/ newValue) { value = newValue; }, | |
628 _cancelAndErrorClosure(subscription, result) | |
629 ); | |
630 }, | |
631 onError: (e, st) { | |
632 result._completeError(e, st); | |
633 }, | |
634 onDone: () { | |
635 result._complete(value); | |
636 }, | |
637 cancelOnError: true); | |
638 return result; | |
639 } | |
640 | |
641 /** | |
642 * Collects string of data events' string representations. | |
643 * | |
644 * If [separator] is provided, it is inserted between any two | |
645 * elements. | |
646 * | |
647 * Any error in the stream causes the future to complete with that | |
648 * error. Otherwise it completes with the collected string when | |
649 * the "done" event arrives. | |
650 */ | |
651 Future<String> join([String separator = ""]) { | |
652 _Future<String> result = new _Future<String>(); | |
653 StringBuffer buffer = new StringBuffer(); | |
654 StreamSubscription subscription; | |
655 bool first = true; | |
656 subscription = this.listen( | |
657 (T element) { | |
658 if (!first) { | |
659 buffer.write(separator); | |
660 } | |
661 first = false; | |
662 try { | |
663 buffer.write(element); | |
664 } catch (e, s) { | |
665 _cancelAndErrorWithReplacement(subscription, result, e, s); | |
666 } | |
667 }, | |
668 onError: (e) { | |
669 result._completeError(e); | |
670 }, | |
671 onDone: () { | |
672 result._complete(buffer.toString()); | |
673 }, | |
674 cancelOnError: true); | |
675 return result; | |
676 } | |
677 | |
678 /** | |
679 * Checks whether [needle] occurs in the elements provided by this stream. | |
680 * | |
681 * Completes the [Future] when the answer is known. | |
682 * If this stream reports an error, the [Future] will report that error. | |
683 */ | |
684 Future<bool> contains(Object needle) { | |
685 _Future<bool> future = new _Future<bool>(); | |
686 StreamSubscription subscription; | |
687 subscription = this.listen( | |
688 (T element) { | |
689 _runUserCode( | |
690 () => (element == needle), | |
691 (bool isMatch) { | |
692 if (isMatch) { | |
693 _cancelAndValue(subscription, future, true); | |
694 } | |
695 }, | |
696 _cancelAndErrorClosure(subscription, future) | |
697 ); | |
698 }, | |
699 onError: future._completeError, | |
700 onDone: () { | |
701 future._complete(false); | |
702 }, | |
703 cancelOnError: true); | |
704 return future; | |
705 } | |
706 | |
707 /** | |
708 * Executes [action] on each data event of the stream. | |
709 * | |
710 * Completes the returned [Future] when all events of the stream | |
711 * have been processed. Completes the future with an error if the | |
712 * stream has an error event, or if [action] throws. | |
713 */ | |
714 Future forEach(void action(T element)) { | |
715 _Future future = new _Future(); | |
716 StreamSubscription subscription; | |
717 subscription = this.listen( | |
718 (T element) { | |
719 _runUserCode( | |
720 () => action(element), | |
721 (_) {}, | |
722 _cancelAndErrorClosure(subscription, future) | |
723 ); | |
724 }, | |
725 onError: future._completeError, | |
726 onDone: () { | |
727 future._complete(null); | |
728 }, | |
729 cancelOnError: true); | |
730 return future; | |
731 } | |
732 | |
733 /** | |
734 * Checks whether [test] accepts all elements provided by this stream. | |
735 * | |
736 * Completes the [Future] when the answer is known. | |
737 * If this stream reports an error, the [Future] will report that error. | |
738 */ | |
739 Future<bool> every(bool test(T element)) { | |
740 _Future<bool> future = new _Future<bool>(); | |
741 StreamSubscription subscription; | |
742 subscription = this.listen( | |
743 (T element) { | |
744 _runUserCode( | |
745 () => test(element), | |
746 (bool isMatch) { | |
747 if (!isMatch) { | |
748 _cancelAndValue(subscription, future, false); | |
749 } | |
750 }, | |
751 _cancelAndErrorClosure(subscription, future) | |
752 ); | |
753 }, | |
754 onError: future._completeError, | |
755 onDone: () { | |
756 future._complete(true); | |
757 }, | |
758 cancelOnError: true); | |
759 return future; | |
760 } | |
761 | |
762 /** | |
763 * Checks whether [test] accepts any element provided by this stream. | |
764 * | |
765 * Completes the [Future] when the answer is known. | |
766 * | |
767 * If this stream reports an error, the [Future] reports that error. | |
768 * | |
769 * Stops listening to the stream after the first matching element has been | |
770 * found. | |
771 * | |
772 * Internally the method cancels its subscription after this element. This | |
773 * means that single-subscription (non-broadcast) streams are closed and | |
774 * cannot be reused after a call to this method. | |
775 */ | |
776 Future<bool> any(bool test(T element)) { | |
777 _Future<bool> future = new _Future<bool>(); | |
778 StreamSubscription subscription; | |
779 subscription = this.listen( | |
780 (T element) { | |
781 _runUserCode( | |
782 () => test(element), | |
783 (bool isMatch) { | |
784 if (isMatch) { | |
785 _cancelAndValue(subscription, future, true); | |
786 } | |
787 }, | |
788 _cancelAndErrorClosure(subscription, future) | |
789 ); | |
790 }, | |
791 onError: future._completeError, | |
792 onDone: () { | |
793 future._complete(false); | |
794 }, | |
795 cancelOnError: true); | |
796 return future; | |
797 } | |
798 | |
799 | |
800 /** Counts the elements in the stream. */ | |
801 Future<int> get length { | |
802 _Future<int> future = new _Future<int>(); | |
803 int count = 0; | |
804 this.listen( | |
805 (_) { count++; }, | |
806 onError: future._completeError, | |
807 onDone: () { | |
808 future._complete(count); | |
809 }, | |
810 cancelOnError: true); | |
811 return future; | |
812 } | |
813 | |
814 /** | |
815 * Reports whether this stream contains any elements. | |
816 * | |
817 * Stops listening to the stream after the first element has been received. | |
818 * | |
819 * Internally the method cancels its subscription after the first element. | |
820 * This means that single-subscription (non-broadcast) streams are closed and | |
821 * cannot be reused after a call to this getter. | |
822 */ | |
823 Future<bool> get isEmpty { | |
824 _Future<bool> future = new _Future<bool>(); | |
825 StreamSubscription subscription; | |
826 subscription = this.listen( | |
827 (_) { | |
828 _cancelAndValue(subscription, future, false); | |
829 }, | |
830 onError: future._completeError, | |
831 onDone: () { | |
832 future._complete(true); | |
833 }, | |
834 cancelOnError: true); | |
835 return future; | |
836 } | |
837 | |
838 /** Collects the data of this stream in a [List]. */ | |
839 Future<List<T>> toList() { | |
840 List<T> result = <T>[]; | |
841 _Future<List<T>> future = new _Future<List<T>>(); | |
842 this.listen( | |
843 (T data) { | |
844 result.add(data); | |
845 }, | |
846 onError: future._completeError, | |
847 onDone: () { | |
848 future._complete(result); | |
849 }, | |
850 cancelOnError: true); | |
851 return future; | |
852 } | |
853 | |
854 /** | |
855 * Collects the data of this stream in a [Set]. | |
856 * | |
857 * The returned set is the same type as returned by `new Set<T>()`. | |
858 * If another type of set is needed, either use [forEach] to add each | |
859 * element to the set, or use | |
860 * `toList().then((list) => new SomeOtherSet.from(list))` | |
861 * to create the set. | |
862 */ | |
863 Future<Set<T>> toSet() { | |
864 Set<T> result = new Set<T>(); | |
865 _Future<Set<T>> future = new _Future<Set<T>>(); | |
866 this.listen( | |
867 (T data) { | |
868 result.add(data); | |
869 }, | |
870 onError: future._completeError, | |
871 onDone: () { | |
872 future._complete(result); | |
873 }, | |
874 cancelOnError: true); | |
875 return future; | |
876 } | |
877 | |
878 /** | |
879 * Discards all data on the stream, but signals when it's done or an error | |
880 * occured. | |
881 * | |
882 * When subscribing using [drain], cancelOnError will be true. This means | |
883 * that the future will complete with the first error on the stream and then | |
884 * cancel the subscription. | |
885 * | |
886 * In case of a `done` event the future completes with the given | |
887 * [futureValue]. | |
888 */ | |
889 Future/*<E>*/ drain/*<E>*/([/*=E*/ futureValue]) | |
890 => listen(null, cancelOnError: true).asFuture/*<E>*/(futureValue); | |
891 | |
892 /** | |
893 * Provides at most the first [n] values of this stream. | |
894 * | |
895 * Forwards the first [n] data events of this stream, and all error | |
896 * events, to the returned stream, and ends with a done event. | |
897 * | |
898 * If this stream produces fewer than [count] values before it's done, | |
899 * so will the returned stream. | |
900 * | |
901 * Stops listening to the stream after the first [n] elements have been | |
902 * received. | |
903 * | |
904 * Internally the method cancels its subscription after these elements. This | |
905 * means that single-subscription (non-broadcast) streams are closed and | |
906 * cannot be reused after a call to this method. | |
907 * | |
908 * The returned stream is a broadcast stream if this stream is. | |
909 * For a broadcast stream, the events are only counted from the time | |
910 * the returned stream is listened to. | |
911 */ | |
912 Stream<T> take(int count) { | |
913 return new _TakeStream<T>(this, count); | |
914 } | |
915 | |
916 /** | |
917 * Forwards data events while [test] is successful. | |
918 * | |
919 * The returned stream provides the same events as this stream as long | |
920 * as [test] returns [:true:] for the event data. The stream is done | |
921 * when either this stream is done, or when this stream first provides | |
922 * a value that [test] doesn't accept. | |
923 * | |
924 * Stops listening to the stream after the accepted elements. | |
925 * | |
926 * Internally the method cancels its subscription after these elements. This | |
927 * means that single-subscription (non-broadcast) streams are closed and | |
928 * cannot be reused after a call to this method. | |
929 * | |
930 * The returned stream is a broadcast stream if this stream is. | |
931 * For a broadcast stream, the events are only tested from the time | |
932 * the returned stream is listened to. | |
933 */ | |
934 Stream<T> takeWhile(bool test(T element)) { | |
935 return new _TakeWhileStream<T>(this, test); | |
936 } | |
937 | |
938 /** | |
939 * Skips the first [count] data events from this stream. | |
940 * | |
941 * The returned stream is a broadcast stream if this stream is. | |
942 * For a broadcast stream, the events are only counted from the time | |
943 * the returned stream is listened to. | |
944 */ | |
945 Stream<T> skip(int count) { | |
946 return new _SkipStream<T>(this, count); | |
947 } | |
948 | |
949 /** | |
950 * Skip data events from this stream while they are matched by [test]. | |
951 * | |
952 * Error and done events are provided by the returned stream unmodified. | |
953 * | |
954 * Starting with the first data event where [test] returns false for the | |
955 * event data, the returned stream will have the same events as this stream. | |
956 * | |
957 * The returned stream is a broadcast stream if this stream is. | |
958 * For a broadcast stream, the events are only tested from the time | |
959 * the returned stream is listened to. | |
960 */ | |
961 Stream<T> skipWhile(bool test(T element)) { | |
962 return new _SkipWhileStream<T>(this, test); | |
963 } | |
964 | |
965 /** | |
966 * Skips data events if they are equal to the previous data event. | |
967 * | |
968 * The returned stream provides the same events as this stream, except | |
969 * that it never provides two consecutive data events that are equal. | |
970 * | |
971 * Equality is determined by the provided [equals] method. If that is | |
972 * omitted, the '==' operator on the last provided data element is used. | |
973 * | |
974 * The returned stream is a broadcast stream if this stream is. | |
975 * If a broadcast stream is listened to more than once, each subscription | |
976 * will individually perform the `equals` test. | |
977 */ | |
978 Stream<T> distinct([bool equals(T previous, T next)]) { | |
979 return new _DistinctStream<T>(this, equals); | |
980 } | |
981 | |
982 /** | |
983 * Returns the first element of the stream. | |
984 * | |
985 * Stops listening to the stream after the first element has been received. | |
986 * | |
987 * Internally the method cancels its subscription after the first element. | |
988 * This means that single-subscription (non-broadcast) streams are closed | |
989 * and cannot be reused after a call to this getter. | |
990 * | |
991 * If an error event occurs before the first data event, the resulting future | |
992 * is completed with that error. | |
993 * | |
994 * If this stream is empty (a done event occurs before the first data event), | |
995 * the resulting future completes with a [StateError]. | |
996 * | |
997 * Except for the type of the error, this method is equivalent to | |
998 * [:this.elementAt(0):]. | |
999 */ | |
1000 Future<T> get first { | |
1001 _Future<T> future = new _Future<T>(); | |
1002 StreamSubscription subscription; | |
1003 subscription = this.listen( | |
1004 (T value) { | |
1005 _cancelAndValue(subscription, future, value); | |
1006 }, | |
1007 onError: future._completeError, | |
1008 onDone: () { | |
1009 try { | |
1010 throw IterableElementError.noElement(); | |
1011 } catch (e, s) { | |
1012 _completeWithErrorCallback(future, e, s); | |
1013 } | |
1014 }, | |
1015 cancelOnError: true); | |
1016 return future; | |
1017 } | |
1018 | |
1019 /** | |
1020 * Returns the last element of the stream. | |
1021 * | |
1022 * If an error event occurs before the first data event, the resulting future | |
1023 * is completed with that error. | |
1024 * | |
1025 * If this stream is empty (a done event occurs before the first data event), | |
1026 * the resulting future completes with a [StateError]. | |
1027 */ | |
1028 Future<T> get last { | |
1029 _Future<T> future = new _Future<T>(); | |
1030 T result = null; | |
1031 bool foundResult = false; | |
1032 listen( | |
1033 (T value) { | |
1034 foundResult = true; | |
1035 result = value; | |
1036 }, | |
1037 onError: future._completeError, | |
1038 onDone: () { | |
1039 if (foundResult) { | |
1040 future._complete(result); | |
1041 return; | |
1042 } | |
1043 try { | |
1044 throw IterableElementError.noElement(); | |
1045 } catch (e, s) { | |
1046 _completeWithErrorCallback(future, e, s); | |
1047 } | |
1048 }, | |
1049 cancelOnError: true); | |
1050 return future; | |
1051 } | |
1052 | |
1053 /** | |
1054 * Returns the single element. | |
1055 * | |
1056 * If an error event occurs before or after the first data event, the | |
1057 * resulting future is completed with that error. | |
1058 * | |
1059 * If [this] is empty or has more than one element throws a [StateError]. | |
1060 */ | |
1061 Future<T> get single { | |
1062 _Future<T> future = new _Future<T>(); | |
1063 T result = null; | |
1064 bool foundResult = false; | |
1065 StreamSubscription subscription; | |
1066 subscription = this.listen( | |
1067 (T value) { | |
1068 if (foundResult) { | |
1069 // This is the second element we get. | |
1070 try { | |
1071 throw IterableElementError.tooMany(); | |
1072 } catch (e, s) { | |
1073 _cancelAndErrorWithReplacement(subscription, future, e, s); | |
1074 } | |
1075 return; | |
1076 } | |
1077 foundResult = true; | |
1078 result = value; | |
1079 }, | |
1080 onError: future._completeError, | |
1081 onDone: () { | |
1082 if (foundResult) { | |
1083 future._complete(result); | |
1084 return; | |
1085 } | |
1086 try { | |
1087 throw IterableElementError.noElement(); | |
1088 } catch (e, s) { | |
1089 _completeWithErrorCallback(future, e, s); | |
1090 } | |
1091 }, | |
1092 cancelOnError: true); | |
1093 return future; | |
1094 } | |
1095 | |
1096 /** | |
1097 * Finds the first element of this stream matching [test]. | |
1098 * | |
1099 * Returns a future that is filled with the first element of this stream | |
1100 * that [test] returns true for. | |
1101 * | |
1102 * If no such element is found before this stream is done, and a | |
1103 * [defaultValue] function is provided, the result of calling [defaultValue] | |
1104 * becomes the value of the future. | |
1105 * | |
1106 * Stops listening to the stream after the first matching element has been | |
1107 * received. | |
1108 * | |
1109 * Internally the method cancels its subscription after the first element that | |
1110 * matches the predicate. This means that single-subscription (non-broadcast) | |
1111 * streams are closed and cannot be reused after a call to this method. | |
1112 * | |
1113 * If an error occurs, or if this stream ends without finding a match and | |
1114 * with no [defaultValue] function provided, the future will receive an | |
1115 * error. | |
1116 */ | |
1117 Future<dynamic> firstWhere(bool test(T element), {Object defaultValue()}) { | |
1118 _Future<dynamic> future = new _Future(); | |
1119 StreamSubscription subscription; | |
1120 subscription = this.listen( | |
1121 (T value) { | |
1122 _runUserCode( | |
1123 () => test(value), | |
1124 (bool isMatch) { | |
1125 if (isMatch) { | |
1126 _cancelAndValue(subscription, future, value); | |
1127 } | |
1128 }, | |
1129 _cancelAndErrorClosure(subscription, future) | |
1130 ); | |
1131 }, | |
1132 onError: future._completeError, | |
1133 onDone: () { | |
1134 if (defaultValue != null) { | |
1135 _runUserCode(defaultValue, future._complete, future._completeError); | |
1136 return; | |
1137 } | |
1138 try { | |
1139 throw IterableElementError.noElement(); | |
1140 } catch (e, s) { | |
1141 _completeWithErrorCallback(future, e, s); | |
1142 } | |
1143 }, | |
1144 cancelOnError: true); | |
1145 return future; | |
1146 } | |
1147 | |
1148 /** | |
1149 * Finds the last element in this stream matching [test]. | |
1150 * | |
1151 * As [firstWhere], except that the last matching element is found. | |
1152 * That means that the result cannot be provided before this stream | |
1153 * is done. | |
1154 */ | |
1155 Future<dynamic> lastWhere(bool test(T element), {Object defaultValue()}) { | |
1156 _Future<dynamic> future = new _Future(); | |
1157 T result = null; | |
1158 bool foundResult = false; | |
1159 StreamSubscription subscription; | |
1160 subscription = this.listen( | |
1161 (T value) { | |
1162 _runUserCode( | |
1163 () => true == test(value), | |
1164 (bool isMatch) { | |
1165 if (isMatch) { | |
1166 foundResult = true; | |
1167 result = value; | |
1168 } | |
1169 }, | |
1170 _cancelAndErrorClosure(subscription, future) | |
1171 ); | |
1172 }, | |
1173 onError: future._completeError, | |
1174 onDone: () { | |
1175 if (foundResult) { | |
1176 future._complete(result); | |
1177 return; | |
1178 } | |
1179 if (defaultValue != null) { | |
1180 _runUserCode(defaultValue, future._complete, future._completeError); | |
1181 return; | |
1182 } | |
1183 try { | |
1184 throw IterableElementError.noElement(); | |
1185 } catch (e, s) { | |
1186 _completeWithErrorCallback(future, e, s); | |
1187 } | |
1188 }, | |
1189 cancelOnError: true); | |
1190 return future; | |
1191 } | |
1192 | |
1193 /** | |
1194 * Finds the single element in this stream matching [test]. | |
1195 * | |
1196 * Like [lastMatch], except that it is an error if more than one | |
1197 * matching element occurs in the stream. | |
1198 */ | |
1199 Future<T> singleWhere(bool test(T element)) { | |
1200 _Future<T> future = new _Future<T>(); | |
1201 T result = null; | |
1202 bool foundResult = false; | |
1203 StreamSubscription subscription; | |
1204 subscription = this.listen( | |
1205 (T value) { | |
1206 _runUserCode( | |
1207 () => true == test(value), | |
1208 (bool isMatch) { | |
1209 if (isMatch) { | |
1210 if (foundResult) { | |
1211 try { | |
1212 throw IterableElementError.tooMany(); | |
1213 } catch (e, s) { | |
1214 _cancelAndErrorWithReplacement(subscription, future, e, s); | |
1215 } | |
1216 return; | |
1217 } | |
1218 foundResult = true; | |
1219 result = value; | |
1220 } | |
1221 }, | |
1222 _cancelAndErrorClosure(subscription, future) | |
1223 ); | |
1224 }, | |
1225 onError: future._completeError, | |
1226 onDone: () { | |
1227 if (foundResult) { | |
1228 future._complete(result); | |
1229 return; | |
1230 } | |
1231 try { | |
1232 throw IterableElementError.noElement(); | |
1233 } catch (e, s) { | |
1234 _completeWithErrorCallback(future, e, s); | |
1235 } | |
1236 }, | |
1237 cancelOnError: true); | |
1238 return future; | |
1239 } | |
1240 | |
1241 /** | |
1242 * Returns the value of the [index]th data event of this stream. | |
1243 * | |
1244 * Stops listening to the stream after the [index]th data event has been | |
1245 * received. | |
1246 * | |
1247 * Internally the method cancels its subscription after these elements. This | |
1248 * means that single-subscription (non-broadcast) streams are closed and | |
1249 * cannot be reused after a call to this method. | |
1250 * | |
1251 * If an error event occurs before the value is found, the future completes | |
1252 * with this error. | |
1253 * | |
1254 * If a done event occurs before the value is found, the future completes | |
1255 * with a [RangeError]. | |
1256 */ | |
1257 Future<T> elementAt(int index) { | |
1258 if (index is! int || index < 0) throw new ArgumentError(index); | |
1259 _Future<T> future = new _Future<T>(); | |
1260 StreamSubscription subscription; | |
1261 int elementIndex = 0; | |
1262 subscription = this.listen( | |
1263 (T value) { | |
1264 if (index == elementIndex) { | |
1265 _cancelAndValue(subscription, future, value); | |
1266 return; | |
1267 } | |
1268 elementIndex += 1; | |
1269 }, | |
1270 onError: future._completeError, | |
1271 onDone: () { | |
1272 future._completeError( | |
1273 new RangeError.index(index, this, "index", null, elementIndex)); | |
1274 }, | |
1275 cancelOnError: true); | |
1276 return future; | |
1277 } | |
1278 | |
1279 /** | |
1280 * Creates a new stream with the same events as this stream. | |
1281 * | |
1282 * Whenever more than [timeLimit] passes between two events from this stream, | |
1283 * the [onTimeout] function is called. | |
1284 * | |
1285 * The countdown doesn't start until the returned stream is listened to. | |
1286 * The countdown is reset every time an event is forwarded from this stream, | |
1287 * or when the stream is paused and resumed. | |
1288 * | |
1289 * The [onTimeout] function is called with one argument: an | |
1290 * [EventSink] that allows putting events into the returned stream. | |
1291 * This `EventSink` is only valid during the call to `onTimeout`. | |
1292 * | |
1293 * If `onTimeout` is omitted, a timeout will just put a [TimeoutException] | |
1294 * into the error channel of the returned stream. | |
1295 * | |
1296 * The returned stream is a broadcast stream if this stream is. | |
1297 * If a broadcast stream is listened to more than once, each subscription | |
1298 * will have its individually timer that starts counting on listen, | |
1299 * and the subscriptions' timers can be paused individually. | |
1300 */ | |
1301 Stream<T> timeout(Duration timeLimit, {void onTimeout(EventSink<T> sink)}) { | |
1302 StreamController<T> controller; | |
1303 // The following variables are set on listen. | |
1304 StreamSubscription<T> subscription; | |
1305 Timer timer; | |
1306 Zone zone; | |
1307 _TimerCallback timeout; | |
1308 | |
1309 void onData(T event) { | |
1310 timer.cancel(); | |
1311 controller.add(event); | |
1312 timer = zone.createTimer(timeLimit, timeout); | |
1313 } | |
1314 void onError(error, StackTrace stackTrace) { | |
1315 timer.cancel(); | |
1316 assert(controller is _StreamController || | |
1317 controller is _BroadcastStreamController); | |
1318 dynamic eventSink = controller; | |
1319 eventSink._addError(error, stackTrace); // Avoid Zone error replacement. | |
1320 timer = zone.createTimer(timeLimit, timeout); | |
1321 } | |
1322 void onDone() { | |
1323 timer.cancel(); | |
1324 controller.close(); | |
1325 } | |
1326 void onListen() { | |
1327 // This is the onListen callback for of controller. | |
1328 // It runs in the same zone that the subscription was created in. | |
1329 // Use that zone for creating timers and running the onTimeout | |
1330 // callback. | |
1331 zone = Zone.current; | |
1332 if (onTimeout == null) { | |
1333 timeout = () { | |
1334 controller.addError(new TimeoutException("No stream event", | |
1335 timeLimit), null); | |
1336 }; | |
1337 } else { | |
1338 // TODO(floitsch): the return type should be 'void', and the type | |
1339 // should be inferred. | |
1340 var registeredOnTimeout = | |
1341 zone.registerUnaryCallback/*<dynamic, EventSink<T>>*/(onTimeout); | |
1342 _ControllerEventSinkWrapper wrapper = | |
1343 new _ControllerEventSinkWrapper(null); | |
1344 timeout = () { | |
1345 wrapper._sink = controller; // Only valid during call. | |
1346 zone.runUnaryGuarded(registeredOnTimeout, wrapper); | |
1347 wrapper._sink = null; | |
1348 }; | |
1349 } | |
1350 | |
1351 subscription = this.listen(onData, onError: onError, onDone: onDone); | |
1352 timer = zone.createTimer(timeLimit, timeout); | |
1353 } | |
1354 Future onCancel() { | |
1355 timer.cancel(); | |
1356 Future result = subscription.cancel(); | |
1357 subscription = null; | |
1358 return result; | |
1359 } | |
1360 controller = isBroadcast | |
1361 ? new _SyncBroadcastStreamController<T>(onListen, onCancel) | |
1362 : new _SyncStreamController<T>( | |
1363 onListen, | |
1364 () { | |
1365 // Don't null the timer, onCancel may call cancel again. | |
1366 timer.cancel(); | |
1367 subscription.pause(); | |
1368 }, | |
1369 () { | |
1370 subscription.resume(); | |
1371 timer = zone.createTimer(timeLimit, timeout); | |
1372 }, | |
1373 onCancel); | |
1374 return controller.stream; | |
1375 } | |
1376 } | |
1377 | |
1378 /** | |
1379 * A subscription on events from a [Stream]. | |
1380 * | |
1381 * When you listen on a [Stream] using [Stream.listen], | |
1382 * a [StreamSubscription] object is returned. | |
1383 * | |
1384 * The subscription provides events to the listener, | |
1385 * and holds the callbacks used to handle the events. | |
1386 * The subscription can also be used to unsubscribe from the events, | |
1387 * or to temporarily pause the events from the stream. | |
1388 */ | |
1389 abstract class StreamSubscription<T> { | |
1390 /** | |
1391 * Cancels this subscription. | |
1392 * | |
1393 * After this call, the subscription no longer receives events. | |
1394 * | |
1395 * The stream may need to shut down the source of events and clean up after | |
1396 * the subscription is canceled. | |
1397 * | |
1398 * Returns a future that is completed once the stream has finished | |
1399 * its cleanup. May also return `null` if no cleanup was necessary. | |
1400 * | |
1401 * Typically, futures are returned when the stream needs to release resources. | |
1402 * For example, a stream might need to close an open file (as an asynchronous | |
1403 * operation). If the listener wants to delete the file after having | |
1404 * canceled the subscription, it must wait for the cleanup future to complete. | |
1405 * | |
1406 * A returned future completes with a `null` value. | |
1407 * If the cleanup throws, which it really shouldn't, the returned future | |
1408 * completes with that error. | |
1409 */ | |
1410 Future cancel(); | |
1411 | |
1412 /** | |
1413 * Set or override the data event handler of this subscription. | |
1414 * | |
1415 * This method overrides the handler that has been set at the invocation of | |
1416 * [Stream.listen]. | |
1417 */ | |
1418 void onData(void handleData(T data)); | |
1419 | |
1420 /** | |
1421 * Set or override the error event handler of this subscription. | |
1422 * | |
1423 * This method overrides the handler that has been set at the invocation of | |
1424 * [Stream.listen] or by calling [asFuture]. | |
1425 */ | |
1426 void onError(Function handleError); | |
1427 | |
1428 /** | |
1429 * Set or override the done event handler of this subscription. | |
1430 * | |
1431 * This method overrides the handler that has been set at the invocation of | |
1432 * [Stream.listen] or by calling [asFuture]. | |
1433 */ | |
1434 void onDone(void handleDone()); | |
1435 | |
1436 /** | |
1437 * Request that the stream pauses events until further notice. | |
1438 * | |
1439 * While paused, the subscription will not fire any events. | |
1440 * If it receives events from its source, they will be buffered until | |
1441 * the subscription is resumed. | |
1442 * The underlying source is usually informed about the pause, | |
1443 * so it can stop generating events until the subscription is resumed. | |
1444 * | |
1445 * To avoid buffering events on a broadcast stream, it is better to | |
1446 * cancel this subscription, and start to listen again when events | |
1447 * are needed. | |
1448 * | |
1449 * If [resumeSignal] is provided, the stream will undo the pause | |
1450 * when the future completes. If the future completes with an error, | |
1451 * the stream will resume, but the error will not be handled! | |
1452 * | |
1453 * A call to [resume] will also undo a pause. | |
1454 * | |
1455 * If the subscription is paused more than once, an equal number | |
1456 * of resumes must be performed to resume the stream. | |
1457 * | |
1458 * Currently DOM streams silently drop events when the stream is paused. This | |
1459 * is a bug and will be fixed. | |
1460 */ | |
1461 void pause([Future resumeSignal]); | |
1462 | |
1463 /** | |
1464 * Resume after a pause. | |
1465 */ | |
1466 void resume(); | |
1467 | |
1468 /** | |
1469 * Returns true if the [StreamSubscription] is paused. | |
1470 */ | |
1471 bool get isPaused; | |
1472 | |
1473 /** | |
1474 * Returns a future that handles the [onDone] and [onError] callbacks. | |
1475 * | |
1476 * This method *overwrites* the existing [onDone] and [onError] callbacks | |
1477 * with new ones that complete the returned future. | |
1478 * | |
1479 * In case of an error the subscription will automatically cancel (even | |
1480 * when it was listening with `cancelOnError` set to `false`). | |
1481 * | |
1482 * In case of a `done` event the future completes with the given | |
1483 * [futureValue]. | |
1484 */ | |
1485 Future/*<E>*/ asFuture/*<E>*/([var/*=E*/ futureValue]); | |
1486 } | |
1487 | |
1488 | |
1489 /** | |
1490 * An interface that abstracts creation or handling of [Stream] events. | |
1491 */ | |
1492 abstract class EventSink<T> implements Sink<T> { | |
1493 /** Send a data event to a stream. */ | |
1494 void add(T event); | |
1495 | |
1496 /** Send an async error to a stream. */ | |
1497 void addError(errorEvent, [StackTrace stackTrace]); | |
1498 | |
1499 /** Close the sink. No further events can be added after closing. */ | |
1500 void close(); | |
1501 } | |
1502 | |
1503 | |
1504 /** [Stream] wrapper that only exposes the [Stream] interface. */ | |
1505 class StreamView<T> extends Stream<T> { | |
1506 final Stream<T> _stream; | |
1507 | |
1508 const StreamView(Stream<T> stream) : _stream = stream, super._internal(); | |
1509 | |
1510 bool get isBroadcast => _stream.isBroadcast; | |
1511 | |
1512 Stream<T> asBroadcastStream( | |
1513 {void onListen(StreamSubscription<T> subscription), | |
1514 void onCancel(StreamSubscription<T> subscription)}) | |
1515 => _stream.asBroadcastStream(onListen: onListen, onCancel: onCancel); | |
1516 | |
1517 StreamSubscription<T> listen(void onData(T value), | |
1518 { Function onError, | |
1519 void onDone(), | |
1520 bool cancelOnError }) { | |
1521 return _stream.listen(onData, onError: onError, onDone: onDone, | |
1522 cancelOnError: cancelOnError); | |
1523 } | |
1524 } | |
1525 | |
1526 | |
1527 /** | |
1528 * Abstract interface for a "sink" accepting multiple entire streams. | |
1529 * | |
1530 * A consumer can accept a number of consecutive streams using [addStream], | |
1531 * and when no further data need to be added, the [close] method tells the | |
1532 * consumer to complete its work and shut down. | |
1533 * | |
1534 * This class is not just a [Sink<Stream>] because it is also combined with | |
1535 * other [Sink] classes, like it's combined with [EventSink] in the | |
1536 * [StreamSink] class. | |
1537 * | |
1538 * The [Stream.pipe] accepts a `StreamConsumer` and will pass the stream | |
1539 * to the consumer's [addStream] method. When that completes, it will | |
1540 * call [close] and then complete its own returned future. | |
1541 */ | |
1542 abstract class StreamConsumer<S> { | |
1543 /** | |
1544 * Consumes the elements of [stream]. | |
1545 * | |
1546 * Listens on [stream] and does something for each event. | |
1547 * | |
1548 * Returns a future which is completed when the stream is done being added, | |
1549 * and the consumer is ready to accept a new stream. | |
1550 * No further calls to [addStream] or [close] should happen before the | |
1551 * returned future has completed. | |
1552 * | |
1553 * The consumer may stop listening to the stream after an error, | |
1554 * it may consume all the errors and only stop at a done event, | |
1555 * or it may be canceled early if the receiver don't want any further events. | |
1556 * | |
1557 * If the consumer stops listening because of some error preventing it | |
1558 * from continuing, it may report this error in the returned future, | |
1559 * otherwise it will just complete the future with `null`. | |
1560 */ | |
1561 Future addStream(Stream<S> stream); | |
1562 | |
1563 /** | |
1564 * Tells the consumer that no further streams will be added. | |
1565 * | |
1566 * This allows the consumer to complete any remaining work and release | |
1567 * resources that are no longer needed | |
1568 * | |
1569 * Returns a future which is completed when the consumer has shut down. | |
1570 * If cleaning up can fail, the error may be reported in the returned future, | |
1571 * otherwise it completes with `null`. | |
1572 */ | |
1573 Future close(); | |
1574 } | |
1575 | |
1576 | |
1577 /** | |
1578 * A object that accepts stream events both synchronously and asynchronously. | |
1579 * | |
1580 * A [StreamSink] unifies the asynchronous methods from [StreamConsumer] and | |
1581 * the synchronous methods from [EventSink]. | |
1582 * | |
1583 * The [EventSink] methods can't be used while the [addStream] is called. | |
1584 * As soon as the [addStream]'s [Future] completes with a value, the | |
1585 * [EventSink] methods can be used again. | |
1586 * | |
1587 * If [addStream] is called after any of the [EventSink] methods, it'll | |
1588 * be delayed until the underlying system has consumed the data added by the | |
1589 * [EventSink] methods. | |
1590 * | |
1591 * When [EventSink] methods are used, the [done] [Future] can be used to | |
1592 * catch any errors. | |
1593 * | |
1594 * When [close] is called, it will return the [done] [Future]. | |
1595 */ | |
1596 abstract class StreamSink<S> implements EventSink<S>, StreamConsumer<S> { | |
1597 /** | |
1598 * Tells the stream sink that no further streams will be added. | |
1599 * | |
1600 * This allows the stream sink to complete any remaining work and release | |
1601 * resources that are no longer needed | |
1602 * | |
1603 * Returns a future which is completed when the stream sink has shut down. | |
1604 * If cleaning up can fail, the error may be reported in the returned future, | |
1605 * otherwise it completes with `null`. | |
1606 * | |
1607 * Returns the same future as [done]. | |
1608 * | |
1609 * The stream sink may close before the [close] method is called, either due | |
1610 * to an error or because it is itself providing events to someone who has | |
1611 * stopped listening. In that case, the [done] future is completed first, | |
1612 * and the `close` method will return the `done` future when called. | |
1613 * | |
1614 * Unifies [StreamConsumer.close] and [EventSink.close] which both mark their | |
1615 * object as not expecting any further events. | |
1616 */ | |
1617 Future close(); | |
1618 | |
1619 /** | |
1620 * Return a future which is completed when the [StreamSink] is finished. | |
1621 * | |
1622 * If the `StreamSink` fails with an error, | |
1623 * perhaps in response to adding events using [add], [addError] or [close], | |
1624 * the [done] future will complete with that error. | |
1625 * | |
1626 * Otherwise, the returned future will complete when either: | |
1627 * | |
1628 * * all events have been processed and the sink has been closed, or | |
1629 * * the sink has otherwise been stopped from handling more events | |
1630 * (for example by cancelling a stream subscription). | |
1631 */ | |
1632 Future get done; | |
1633 } | |
1634 | |
1635 | |
1636 /** | |
1637 * The target of a [Stream.transform] call. | |
1638 * | |
1639 * The [Stream.transform] call will pass itself to this object and then return | |
1640 * the resulting stream. | |
1641 * | |
1642 * It is good practice to write transformers that can be used multiple times. | |
1643 */ | |
1644 abstract class StreamTransformer<S, T> { | |
1645 /** | |
1646 * Creates a [StreamTransformer]. | |
1647 * | |
1648 * The returned instance takes responsibility of implementing ([bind]). | |
1649 * When the user invokes `bind` it returns a new "bound" stream. Only when | |
1650 * the user starts listening to the bound stream, the `listen` method | |
1651 * invokes the given closure [transformer]. | |
1652 * | |
1653 * The [transformer] closure receives the stream, that was bound, as argument | |
1654 * and returns a [StreamSubscription]. In almost all cases the closure | |
1655 * listens itself to the stream that is given as argument. | |
1656 * | |
1657 * The result of invoking the [transformer] closure is a [StreamSubscription]. | |
1658 * The bound stream-transformer (created by the `bind` method above) then sets | |
1659 * the handlers it received as part of the `listen` call. | |
1660 * | |
1661 * Conceptually this can be summarized as follows: | |
1662 * | |
1663 * 1. `var transformer = new StreamTransformer(transformerClosure);` | |
1664 * creates a `StreamTransformer` that supports the `bind` method. | |
1665 * 2. `var boundStream = stream.transform(transformer);` binds the `stream` | |
1666 * and returns a bound stream that has a pointer to `stream`. | |
1667 * 3. `boundStream.listen(f1, onError: f2, onDone: f3, cancelOnError: b)` | |
1668 * starts the listening and transformation. This is accomplished | |
1669 * in 2 steps: first the `boundStream` invokes the `transformerClosure` with | |
1670 * the `stream` it captured: `transformerClosure(stream, b)`. | |
1671 * The result `subscription`, a [StreamSubscription], is then | |
1672 * updated to receive its handlers: `subscription.onData(f1)`, | |
1673 * `subscription.onError(f2)`, `subscription(f3)`. Finally the subscription | |
1674 * is returned as result of the `listen` call. | |
1675 * | |
1676 * There are two common ways to create a StreamSubscription: | |
1677 * | |
1678 * 1. by creating a new class that implements [StreamSubscription]. | |
1679 * Note that the subscription should run callbacks in the [Zone] the | |
1680 * stream was listened to. | |
1681 * 2. by allocating a [StreamController] and to return the result of | |
1682 * listening to its stream. | |
1683 * | |
1684 * Example use of a duplicating transformer: | |
1685 * | |
1686 * stringStream.transform(new StreamTransformer<String, String>( | |
1687 * (Stream<String> input, bool cancelOnError) { | |
1688 * StreamController<String> controller; | |
1689 * StreamSubscription<String> subscription; | |
1690 * controller = new StreamController<String>( | |
1691 * onListen: () { | |
1692 * subscription = input.listen((data) { | |
1693 * // Duplicate the data. | |
1694 * controller.add(data); | |
1695 * controller.add(data); | |
1696 * }, | |
1697 * onError: controller.addError, | |
1698 * onDone: controller.close, | |
1699 * cancelOnError: cancelOnError); | |
1700 * }, | |
1701 * onPause: () { subscription.pause(); }, | |
1702 * onResume: () { subscription.resume(); }, | |
1703 * onCancel: () { subscription.cancel(); }, | |
1704 * sync: true); | |
1705 * return controller.stream.listen(null); | |
1706 * }); | |
1707 */ | |
1708 const factory StreamTransformer( | |
1709 StreamSubscription<T> transformer(Stream<S> stream, bool cancelOnError)) | |
1710 = _StreamSubscriptionTransformer<S, T>; | |
1711 | |
1712 /** | |
1713 * Creates a [StreamTransformer] that delegates events to the given functions. | |
1714 * | |
1715 * Example use of a duplicating transformer: | |
1716 * | |
1717 * stringStream.transform(new StreamTransformer<String, String>.fromHandle
rs( | |
1718 * handleData: (String value, EventSink<String> sink) { | |
1719 * sink.add(value); | |
1720 * sink.add(value); // Duplicate the incoming events. | |
1721 * })); | |
1722 */ | |
1723 factory StreamTransformer.fromHandlers({ | |
1724 void handleData(S data, EventSink<T> sink), | |
1725 void handleError(Object error, StackTrace stackTrace, EventSink<T> sink), | |
1726 void handleDone(EventSink<T> sink)}) | |
1727 = _StreamHandlerTransformer<S, T>; | |
1728 | |
1729 /** | |
1730 * Transform the incoming [stream]'s events. | |
1731 * | |
1732 * Creates a new stream. | |
1733 * When this stream is listened to, it will start listening on [stream], | |
1734 * and generate events on the new stream based on the events from [stream]. | |
1735 * | |
1736 * Subscriptions on the returned stream should propagate pause state | |
1737 * to the subscription on [stream]. | |
1738 */ | |
1739 Stream<T> bind(Stream<S> stream); | |
1740 } | |
1741 | |
1742 /** | |
1743 * An [Iterator] like interface for the values of a [Stream]. | |
1744 * | |
1745 * This wraps a [Stream] and a subscription on the stream. It listens | |
1746 * on the stream, and completes the future returned by [moveNext] when the | |
1747 * next value becomes available. | |
1748 */ | |
1749 abstract class StreamIterator<T> { | |
1750 | |
1751 /** Create a [StreamIterator] on [stream]. */ | |
1752 factory StreamIterator(Stream<T> stream) | |
1753 // TODO(lrn): use redirecting factory constructor when type | |
1754 // arguments are supported. | |
1755 => new _StreamIteratorImpl<T>(stream); | |
1756 | |
1757 /** | |
1758 * Wait for the next stream value to be available. | |
1759 * | |
1760 * Returns a future which will complete with either `true` or `false`. | |
1761 * Completing with `true` means that another event has been received and | |
1762 * can be read as [current]. | |
1763 * Completing with `false` means that the stream itearation is done and | |
1764 * no further events will ever be available. | |
1765 * The future may complete with an error, if the stream produces an error, | |
1766 * which also ends iteration. | |
1767 * | |
1768 * The function must not be called again until the future returned by a | |
1769 * previous call is completed. | |
1770 */ | |
1771 Future<bool> moveNext(); | |
1772 | |
1773 /** | |
1774 * The current value of the stream. | |
1775 * | |
1776 * Is `null` before the first call to [moveNext] and after a call to | |
1777 * `moveNext` completes with a `false` result or an error. | |
1778 * | |
1779 * When a `moveNext` call completes with `true`, the `current` field holds | |
1780 * the most recent event of the stream, and it stays like that until the next | |
1781 * call to `moveNext`. | |
1782 * Between a call to `moveNext` and when its returned future completes, | |
1783 * the value is unspecified. | |
1784 */ | |
1785 T get current; | |
1786 | |
1787 /** | |
1788 * Cancels the stream iterator (and the underlying stream subscription) early. | |
1789 * | |
1790 * The stream iterator is automatically canceled if the [moveNext] future | |
1791 * completes with either `false` or an error. | |
1792 * | |
1793 * If you need to stop listening for values before the stream iterator is | |
1794 * automatically closed, you must call [cancel] to ensure that the stream | |
1795 * is properly closed. | |
1796 * | |
1797 * If [moveNext] has been called when the iterator is cancelled, | |
1798 * its returned future will complete with `false` as value, | |
1799 * as will all further calls to [moveNext]. | |
1800 * | |
1801 * Returns a future if the cancel-operation is not completed synchronously. | |
1802 * Otherwise returns `null`. | |
1803 */ | |
1804 Future cancel(); | |
1805 } | |
1806 | |
1807 | |
1808 /** | |
1809 * Wraps an [_EventSink] so it exposes only the [EventSink] interface. | |
1810 */ | |
1811 class _ControllerEventSinkWrapper<T> implements EventSink<T> { | |
1812 EventSink _sink; | |
1813 _ControllerEventSinkWrapper(this._sink); | |
1814 | |
1815 void add(T data) { _sink.add(data); } | |
1816 void addError(error, [StackTrace stackTrace]) { | |
1817 _sink.addError(error, stackTrace); | |
1818 } | |
1819 void close() { _sink.close(); } | |
1820 } | |
OLD | NEW |