OLD | NEW |
1 // Copyright (c) 2013, the Dart project authors. Please see the AUTHORS file | 1 // Copyright (c) 2013, the Dart project authors. Please see the AUTHORS file |
2 // for details. All rights reserved. Use of this source code is governed by a | 2 // for details. All rights reserved. Use of this source code is governed by a |
3 // BSD-style license that can be found in the LICENSE file. | 3 // BSD-style license that can be found in the LICENSE file. |
4 | 4 |
5 part of dart.async; | 5 part of dart.async; |
6 | 6 |
7 // ------------------------------------------------------------------- | 7 // ------------------------------------------------------------------- |
8 // Core Stream types | 8 // Core Stream types |
9 // ------------------------------------------------------------------- | 9 // ------------------------------------------------------------------- |
10 | 10 |
| 11 typedef void _TimerCallback(); |
| 12 |
11 /** | 13 /** |
12 * A source of asynchronous data events. | 14 * A source of asynchronous data events. |
13 * | 15 * |
14 * A Stream provides a way to receive a sequence of events. | 16 * A Stream provides a way to receive a sequence of events. |
15 * Each event is either a data event or an error event, | 17 * Each event is either a data event or an error event, |
16 * representing the result of a single computation. | 18 * representing the result of a single computation. |
17 * When the events provided by a Stream have all been sent, | 19 * When the events provided by a Stream have all been sent, |
18 * a single "done" event will mark the end. | 20 * a single "done" event will mark the end. |
19 * | 21 * |
20 * You can [listen] on a stream to make it start generating events, | 22 * You can [listen] on a stream to make it start generating events, |
(...skipping 20 matching lines...) Expand all Loading... |
41 * | 43 * |
42 * *A broadcast stream* allows any number of listeners, and it fires | 44 * *A broadcast stream* allows any number of listeners, and it fires |
43 * its events when they are ready, whether there are listeners or not. | 45 * its events when they are ready, whether there are listeners or not. |
44 * | 46 * |
45 * Broadcast streams are used for independent events/observers. | 47 * Broadcast streams are used for independent events/observers. |
46 * | 48 * |
47 * If several listeners want to listen to a single subscription stream, | 49 * If several listeners want to listen to a single subscription stream, |
48 * use [asBroadcastStream] to create a broadcast stream on top of the | 50 * use [asBroadcastStream] to create a broadcast stream on top of the |
49 * non-broadcast stream. | 51 * non-broadcast stream. |
50 * | 52 * |
51 * On either kind of stream, stream transformationss, such as [where] and | 53 * On either kind of stream, stream transformations, such as [where] and |
52 * [skip], return the same type of stream as the one the method was called on, | 54 * [skip], return the same type of stream as the one the method was called on, |
53 * unless otherwise noted. | 55 * unless otherwise noted. |
54 * | 56 * |
55 * When an event is fired, the listener(s) at that time will receive the event. | 57 * When an event is fired, the listener(s) at that time will receive the event. |
56 * If a listener is added to a broadcast stream while an event is being fired, | 58 * If a listener is added to a broadcast stream while an event is being fired, |
57 * that listener will not receive the event currently being fired. | 59 * that listener will not receive the event currently being fired. |
58 * If a listener is canceled, it immediately stops receiving events. | 60 * If a listener is canceled, it immediately stops receiving events. |
59 * | 61 * |
60 * When the "done" event is fired, subscribers are unsubscribed before | 62 * When the "done" event is fired, subscribers are unsubscribed before |
61 * receiving the event. After the event has been sent, the stream has no | 63 * receiving the event. After the event has been sent, the stream has no |
62 * subscribers. Adding new subscribers to a broadcast stream after this point | 64 * subscribers. Adding new subscribers to a broadcast stream after this point |
63 * is allowed, but they will just receive a new "done" event as soon | 65 * is allowed, but they will just receive a new "done" event as soon |
64 * as possible. | 66 * as possible. |
65 * | 67 * |
66 * Stream subscriptions always respect "pause" requests. If necessary they need | 68 * Stream subscriptions always respect "pause" requests. If necessary they need |
67 * to buffer their input, but often, and preferably, they can simply request | 69 * to buffer their input, but often, and preferably, they can simply request |
68 * their input to pause too. | 70 * their input to pause too. |
69 * | 71 * |
70 * The default implementation of [isBroadcast] returns false. | 72 * The default implementation of [isBroadcast] returns false. |
71 * A broadcast stream inheriting from [Stream] must override [isBroadcast] | 73 * A broadcast stream inheriting from [Stream] must override [isBroadcast] |
72 * to return `true`. | 74 * to return `true`. |
73 */ | 75 */ |
74 abstract class Stream<T> { | 76 abstract class Stream<T> { |
75 Stream(); | 77 Stream(); |
76 | 78 |
77 /** | 79 /** |
| 80 * Internal use only. We do not want to promise that Stream stays const. |
| 81 * |
| 82 * If mixins become compatible with const constructors, we may use a |
| 83 * stream mixin instead of extending Stream from a const class. |
| 84 */ |
| 85 const Stream._internal(); |
| 86 |
| 87 /** |
| 88 * Creates an empty broadcast stream. |
| 89 * |
| 90 * This is a stream which does nothing except sending a done event |
| 91 * when it's listened to. |
| 92 */ |
| 93 const factory Stream.empty() = _EmptyStream<T>; |
| 94 |
| 95 /** |
78 * Creates a new single-subscription stream from the future. | 96 * Creates a new single-subscription stream from the future. |
79 * | 97 * |
80 * When the future completes, the stream will fire one event, either | 98 * When the future completes, the stream will fire one event, either |
81 * data or error, and then close with a done-event. | 99 * data or error, and then close with a done-event. |
82 */ | 100 */ |
83 factory Stream.fromFuture(Future<T> future) { | 101 factory Stream.fromFuture(Future<T> future) { |
84 // Use the controller's buffering to fill in the value even before | 102 // Use the controller's buffering to fill in the value even before |
85 // the stream has a listener. For a single value, it's not worth it | 103 // the stream has a listener. For a single value, it's not worth it |
86 // to wait for a listener before doing the `then` on the future. | 104 // to wait for a listener before doing the `then` on the future. |
87 _StreamController<T> controller = | 105 _StreamController<T> controller = new StreamController<T>(sync: true); |
88 new StreamController<T>(sync: true) as _StreamController<T>; | |
89 future.then((value) { | 106 future.then((value) { |
90 controller._add(value); | 107 controller._add(value); |
91 controller._closeUnchecked(); | 108 controller._closeUnchecked(); |
92 }, | 109 }, |
93 onError: (error, stackTrace) { | 110 onError: (error, stackTrace) { |
94 controller._addError(error, stackTrace); | 111 controller._addError(error, stackTrace); |
95 controller._closeUnchecked(); | 112 controller._closeUnchecked(); |
96 }); | 113 }); |
97 return controller.stream; | 114 return controller.stream; |
98 } | 115 } |
99 | 116 |
100 /** | 117 /** |
| 118 * Create a stream from a group of futures. |
| 119 * |
| 120 * The stream reports the results of the futures on the stream in the order |
| 121 * in which the futures complete. |
| 122 * |
| 123 * If some futures have completed before calling `Stream.fromFutures`, |
| 124 * their result will be output on the created stream in some unspecified |
| 125 * order. |
| 126 * |
| 127 * When all futures have completed, the stream is closed. |
| 128 * |
| 129 * If no future is passed, the stream closes as soon as possible. |
| 130 */ |
| 131 factory Stream.fromFutures(Iterable<Future<T>> futures) { |
| 132 _StreamController<T> controller = new StreamController<T>(sync: true); |
| 133 int count = 0; |
| 134 var onValue = (T value) { |
| 135 if (!controller.isClosed) { |
| 136 controller._add(value); |
| 137 if (--count == 0) controller._closeUnchecked(); |
| 138 } |
| 139 }; |
| 140 var onError = (error, stack) { |
| 141 if (!controller.isClosed) { |
| 142 controller._addError(error, stack); |
| 143 if (--count == 0) controller._closeUnchecked(); |
| 144 } |
| 145 }; |
| 146 // The futures are already running, so start listening to them immediately |
| 147 // (instead of waiting for the stream to be listened on). |
| 148 // If we wait, we might not catch errors in the futures in time. |
| 149 for (var future in futures) { |
| 150 count++; |
| 151 future.then(onValue, onError: onError); |
| 152 } |
| 153 // Use schedule microtask since controller is sync. |
| 154 if (count == 0) scheduleMicrotask(controller.close); |
| 155 return controller.stream; |
| 156 } |
| 157 |
| 158 /** |
101 * Creates a single-subscription stream that gets its data from [data]. | 159 * Creates a single-subscription stream that gets its data from [data]. |
102 * | 160 * |
103 * The iterable is iterated when the stream receives a listener, and stops | 161 * The iterable is iterated when the stream receives a listener, and stops |
104 * iterating if the listener cancels the subscription. | 162 * iterating if the listener cancels the subscription. |
105 * | 163 * |
106 * If iterating [data] throws an error, the stream ends immediately with | 164 * If iterating [data] throws an error, the stream ends immediately with |
107 * that error. No done event will be sent (iteration is not complete), but no | 165 * that error. No done event will be sent (iteration is not complete), but no |
108 * further data events will be generated either, since iteration cannot | 166 * further data events will be generated either, since iteration cannot |
109 * continue. | 167 * continue. |
110 */ | 168 */ |
111 factory Stream.fromIterable(Iterable<T> data) { | 169 factory Stream.fromIterable(Iterable<T> data) { |
112 return new _GeneratedStreamImpl<T>( | 170 return new _GeneratedStreamImpl<T>( |
113 () => new _IterablePendingEvents<T>(data)); | 171 () => new _IterablePendingEvents<T>(data)); |
114 } | 172 } |
115 | 173 |
116 /** | 174 /** |
117 * Creates a stream that repeatedly emits events at [period] intervals. | 175 * Creates a stream that repeatedly emits events at [period] intervals. |
118 * | 176 * |
119 * The event values are computed by invoking [computation]. The argument to | 177 * The event values are computed by invoking [computation]. The argument to |
120 * this callback is an integer that starts with 0 and is incremented for | 178 * this callback is an integer that starts with 0 and is incremented for |
121 * every event. | 179 * every event. |
122 * | 180 * |
123 * If [computation] is omitted the event values will all be `null`. | 181 * If [computation] is omitted the event values will all be `null`. |
124 */ | 182 */ |
125 factory Stream.periodic(Duration period, | 183 factory Stream.periodic(Duration period, |
126 [T computation(int computationCount)]) { | 184 [T computation(int computationCount)]) { |
127 if (computation == null) computation = ((i) => null); | |
128 | |
129 Timer timer; | 185 Timer timer; |
130 int computationCount = 0; | 186 int computationCount = 0; |
131 StreamController<T> controller; | 187 StreamController<T> controller; |
132 // Counts the time that the Stream was running (and not paused). | 188 // Counts the time that the Stream was running (and not paused). |
133 Stopwatch watch = new Stopwatch(); | 189 Stopwatch watch = new Stopwatch(); |
134 | 190 |
135 void sendEvent() { | 191 void sendEvent() { |
136 watch.reset(); | 192 watch.reset(); |
137 T data = computation(computationCount++); | 193 T data; |
| 194 if (computation != null) { |
| 195 try { |
| 196 data = computation(computationCount++); |
| 197 } catch (e, s) { |
| 198 controller.addError(e, s); |
| 199 return; |
| 200 } |
| 201 } |
138 controller.add(data); | 202 controller.add(data); |
139 } | 203 } |
140 | 204 |
141 void startPeriodicTimer() { | 205 void startPeriodicTimer() { |
142 assert(timer == null); | 206 assert(timer == null); |
143 timer = new Timer.periodic(period, (Timer timer) { | 207 timer = new Timer.periodic(period, (Timer timer) { |
144 sendEvent(); | 208 sendEvent(); |
145 }); | 209 }); |
146 } | 210 } |
147 | 211 |
(...skipping 42 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
190 * | 254 * |
191 * class DuplicationSink implements EventSink<String> { | 255 * class DuplicationSink implements EventSink<String> { |
192 * final EventSink<String> _outputSink; | 256 * final EventSink<String> _outputSink; |
193 * DuplicationSink(this._outputSink); | 257 * DuplicationSink(this._outputSink); |
194 * | 258 * |
195 * void add(String data) { | 259 * void add(String data) { |
196 * _outputSink.add(data); | 260 * _outputSink.add(data); |
197 * _outputSink.add(data); | 261 * _outputSink.add(data); |
198 * } | 262 * } |
199 * | 263 * |
200 * void addError(e, [st]) => _outputSink(e, st); | 264 * void addError(e, [st]) { _outputSink.addError(e, st); } |
201 * void close() => _outputSink.close(); | 265 * void close() { _outputSink.close(); } |
202 * } | 266 * } |
203 * | 267 * |
204 * class DuplicationTransformer implements StreamTransformer<String, Strin
g> { | 268 * class DuplicationTransformer implements StreamTransformer<String, Strin
g> { |
205 * // Some generic types ommitted for brevety. | 269 * // Some generic types ommitted for brevety. |
206 * Stream bind(Stream stream) => new Stream<String>.eventTransform( | 270 * Stream bind(Stream stream) => new Stream<String>.eventTransformed( |
207 * stream, | 271 * stream, |
208 * (EventSink sink) => new DuplicationSink(sink)); | 272 * (EventSink sink) => new DuplicationSink(sink)); |
209 * } | 273 * } |
210 * | 274 * |
211 * stringStream.transform(new DuplicationTransformer()); | 275 * stringStream.transform(new DuplicationTransformer()); |
212 * | 276 * |
213 * The resulting stream is a broadcast stream if [source] is. | 277 * The resulting stream is a broadcast stream if [source] is. |
214 */ | 278 */ |
215 factory Stream.eventTransformed(Stream source, | 279 factory Stream.eventTransformed(Stream source, |
216 EventSink mapSink(EventSink<T> sink)) { | 280 EventSink mapSink(EventSink<T> sink)) { |
(...skipping 39 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
256 * is called. If [onData] is null, nothing happens. | 320 * is called. If [onData] is null, nothing happens. |
257 * | 321 * |
258 * On errors from this stream, the [onError] handler is given a | 322 * On errors from this stream, the [onError] handler is given a |
259 * object describing the error. | 323 * object describing the error. |
260 * | 324 * |
261 * The [onError] callback must be of type `void onError(error)` or | 325 * The [onError] callback must be of type `void onError(error)` or |
262 * `void onError(error, StackTrace stackTrace)`. If [onError] accepts | 326 * `void onError(error, StackTrace stackTrace)`. If [onError] accepts |
263 * two arguments it is called with the stack trace (which could be `null` if | 327 * two arguments it is called with the stack trace (which could be `null` if |
264 * the stream itself received an error without stack trace). | 328 * the stream itself received an error without stack trace). |
265 * Otherwise it is called with just the error object. | 329 * Otherwise it is called with just the error object. |
| 330 * If [onError] is omitted, any errors on the stream are considered unhandled, |
| 331 * and will be passed to the current [Zone]'s error handler. |
| 332 * By default unhandled async errors are treated |
| 333 * as if they were uncaught top-level errors. |
266 * | 334 * |
267 * If this stream closes, the [onDone] handler is called. | 335 * If this stream closes, the [onDone] handler is called. |
268 * | 336 * |
269 * If [cancelOnError] is true, the subscription is ended when | 337 * If [cancelOnError] is true, the subscription is ended when |
270 * the first error is reported. The default is false. | 338 * the first error is reported. The default is false. |
271 */ | 339 */ |
272 StreamSubscription<T> listen(void onData(T event), | 340 StreamSubscription<T> listen(void onData(T event), |
273 { Function onError, | 341 { Function onError, |
274 void onDone(), | 342 void onDone(), |
275 bool cancelOnError}); | 343 bool cancelOnError}); |
276 | 344 |
277 /** | 345 /** |
278 * Creates a new stream from this stream that discards some data events. | 346 * Creates a new stream from this stream that discards some data events. |
279 * | 347 * |
280 * The new stream sends the same error and done events as this stream, | 348 * The new stream sends the same error and done events as this stream, |
281 * but it only sends the data events that satisfy the [test]. | 349 * but it only sends the data events that satisfy the [test]. |
282 * | 350 * |
283 * The returned stream is a broadcast stream if this stream is. | 351 * The returned stream is a broadcast stream if this stream is. |
284 * If a broadcast stream is listened to more than once, each subscription | 352 * If a broadcast stream is listened to more than once, each subscription |
285 * will individually perform the `test`. | 353 * will individually perform the `test`. |
286 */ | 354 */ |
287 Stream<T> where(bool test(T event)) { | 355 Stream<T> where(bool test(T event)) { |
288 return new _WhereStream<T>(this, test); | 356 return new _WhereStream<T>(this, test); |
289 } | 357 } |
290 | 358 |
291 /** | 359 /** |
292 * Creates a new stream that converts each element of this stream | 360 * Creates a new stream that converts each element of this stream |
293 * to a new value using the [convert] function. | 361 * to a new value using the [convert] function. |
294 * | 362 * |
| 363 * For each data event, `o`, in this stream, the returned stream |
| 364 * provides a data event with the value `convert(o)`. |
| 365 * If [convert] throws, the returned stream reports the exception as an error |
| 366 * event instead. |
| 367 * |
| 368 * Error and done events are passed through unchanged to the returned stream. |
| 369 * |
295 * The returned stream is a broadcast stream if this stream is. | 370 * The returned stream is a broadcast stream if this stream is. |
| 371 * The [convert] function is called once per data event per listener. |
296 * If a broadcast stream is listened to more than once, each subscription | 372 * If a broadcast stream is listened to more than once, each subscription |
297 * will individually execute `map` for each event. | 373 * will individually call [convert] on each data event. |
298 */ | 374 */ |
299 Stream/*<S>*/ map/*<S>*/(/*=S*/ convert(T event)) { | 375 Stream/*<S>*/ map/*<S>*/(/*=S*/ convert(T event)) { |
300 return new _MapStream<T, dynamic/*=S*/>(this, convert); | 376 return new _MapStream<T, dynamic/*=S*/>(this, convert); |
301 } | 377 } |
302 | 378 |
303 /** | 379 /** |
304 * Creates a new stream with each data event of this stream asynchronously | 380 * Creates a new stream with each data event of this stream asynchronously |
305 * mapped to a new event. | 381 * mapped to a new event. |
306 * | 382 * |
307 * This acts like [map], except that [convert] may return a [Future], | 383 * This acts like [map], except that [convert] may return a [Future], |
308 * and in that case, the stream waits for that future to complete before | 384 * and in that case, the stream waits for that future to complete before |
309 * continuing with its result. | 385 * continuing with its result. |
310 * | 386 * |
311 * The returned stream is a broadcast stream if this stream is. | 387 * The returned stream is a broadcast stream if this stream is. |
312 */ | 388 */ |
313 Stream asyncMap(convert(T event)) { | 389 Stream/*<E>*/ asyncMap/*<E>*/(convert(T event)) { |
314 StreamController controller; | 390 StreamController/*<E>*/ controller; |
315 StreamSubscription subscription; | 391 StreamSubscription/*<T>*/ subscription; |
316 void onListen () { | 392 |
| 393 void onListen() { |
317 final add = controller.add; | 394 final add = controller.add; |
318 assert(controller is _StreamController || | 395 assert(controller is _StreamController || |
319 controller is _BroadcastStreamController); | 396 controller is _BroadcastStreamController); |
320 final eventSink = controller as _EventSink<T>; | 397 final _EventSink/*<E>*/ eventSink = |
| 398 controller as Object /*=_EventSink<E>*/; |
321 final addError = eventSink._addError; | 399 final addError = eventSink._addError; |
322 subscription = this.listen( | 400 subscription = this.listen( |
323 (T event) { | 401 (T event) { |
324 var newValue; | 402 dynamic newValue; |
325 try { | 403 try { |
326 newValue = convert(event); | 404 newValue = convert(event); |
327 } catch (e, s) { | 405 } catch (e, s) { |
328 controller.addError(e, s); | 406 controller.addError(e, s); |
329 return; | 407 return; |
330 } | 408 } |
331 if (newValue is Future) { | 409 if (newValue is Future) { |
332 subscription.pause(); | 410 subscription.pause(); |
333 newValue.then(add, onError: addError) | 411 newValue.then(add, onError: addError) |
334 .whenComplete(subscription.resume); | 412 .whenComplete(subscription.resume); |
335 } else { | 413 } else { |
336 controller.add(newValue); | 414 controller.add(newValue as Object/*=E*/); |
337 } | 415 } |
338 }, | 416 }, |
339 onError: addError, | 417 onError: addError, |
340 onDone: controller.close | 418 onDone: controller.close |
341 ); | 419 ); |
342 } | 420 } |
| 421 |
343 if (this.isBroadcast) { | 422 if (this.isBroadcast) { |
344 controller = new StreamController.broadcast( | 423 controller = new StreamController/*<E>*/.broadcast( |
345 onListen: onListen, | 424 onListen: onListen, |
346 onCancel: () { subscription.cancel(); }, | 425 onCancel: () { subscription.cancel(); }, |
347 sync: true | 426 sync: true |
348 ); | 427 ); |
349 } else { | 428 } else { |
350 controller = new StreamController( | 429 controller = new StreamController/*<E>*/( |
351 onListen: onListen, | 430 onListen: onListen, |
352 onPause: () { subscription.pause(); }, | 431 onPause: () { subscription.pause(); }, |
353 onResume: () { subscription.resume(); }, | 432 onResume: () { subscription.resume(); }, |
354 onCancel: () { subscription.cancel(); }, | 433 onCancel: () { subscription.cancel(); }, |
355 sync: true | 434 sync: true |
356 ); | 435 ); |
357 } | 436 } |
358 return controller.stream; | 437 return controller.stream; |
359 } | 438 } |
360 | 439 |
361 /** | 440 /** |
362 * Creates a new stream with the events of a stream per original event. | 441 * Creates a new stream with the events of a stream per original event. |
363 * | 442 * |
364 * This acts like [expand], except that [convert] returns a [Stream] | 443 * This acts like [expand], except that [convert] returns a [Stream] |
365 * instead of an [Iterable]. | 444 * instead of an [Iterable]. |
366 * The events of the returned stream becomes the events of the returned | 445 * The events of the returned stream becomes the events of the returned |
367 * stream, in the order they are produced. | 446 * stream, in the order they are produced. |
368 * | 447 * |
369 * If [convert] returns `null`, no value is put on the output stream, | 448 * If [convert] returns `null`, no value is put on the output stream, |
370 * just as if it returned an empty stream. | 449 * just as if it returned an empty stream. |
371 * | 450 * |
372 * The returned stream is a broadcast stream if this stream is. | 451 * The returned stream is a broadcast stream if this stream is. |
373 */ | 452 */ |
374 Stream asyncExpand(Stream convert(T event)) { | 453 Stream/*<E>*/ asyncExpand/*<E>*/(Stream/*<E>*/ convert(T event)) { |
375 StreamController controller; | 454 StreamController/*<E>*/ controller; |
376 StreamSubscription subscription; | 455 StreamSubscription<T> subscription; |
377 void onListen() { | 456 void onListen() { |
378 assert(controller is _StreamController || | 457 assert(controller is _StreamController || |
379 controller is _BroadcastStreamController); | 458 controller is _BroadcastStreamController); |
380 final eventSink = controller as _EventSink<T>; | 459 final _EventSink/*<E>*/ eventSink = |
| 460 controller as Object /*=_EventSink<E>*/; |
381 subscription = this.listen( | 461 subscription = this.listen( |
382 (T event) { | 462 (T event) { |
383 Stream newStream; | 463 Stream/*<E>*/ newStream; |
384 try { | 464 try { |
385 newStream = convert(event); | 465 newStream = convert(event); |
386 } catch (e, s) { | 466 } catch (e, s) { |
387 controller.addError(e, s); | 467 controller.addError(e, s); |
388 return; | 468 return; |
389 } | 469 } |
390 if (newStream != null) { | 470 if (newStream != null) { |
391 subscription.pause(); | 471 subscription.pause(); |
392 controller.addStream(newStream) | 472 controller.addStream(newStream) |
393 .whenComplete(subscription.resume); | 473 .whenComplete(subscription.resume); |
394 } | 474 } |
395 }, | 475 }, |
396 onError: eventSink._addError, // Avoid Zone error replacement. | 476 onError: eventSink._addError, // Avoid Zone error replacement. |
397 onDone: controller.close | 477 onDone: controller.close |
398 ); | 478 ); |
399 } | 479 } |
400 if (this.isBroadcast) { | 480 if (this.isBroadcast) { |
401 controller = new StreamController.broadcast( | 481 controller = new StreamController/*<E>*/.broadcast( |
402 onListen: onListen, | 482 onListen: onListen, |
403 onCancel: () { subscription.cancel(); }, | 483 onCancel: () { subscription.cancel(); }, |
404 sync: true | 484 sync: true |
405 ); | 485 ); |
406 } else { | 486 } else { |
407 controller = new StreamController( | 487 controller = new StreamController/*<E>*/( |
408 onListen: onListen, | 488 onListen: onListen, |
409 onPause: () { subscription.pause(); }, | 489 onPause: () { subscription.pause(); }, |
410 onResume: () { subscription.resume(); }, | 490 onResume: () { subscription.resume(); }, |
411 onCancel: () { subscription.cancel(); }, | 491 onCancel: () { subscription.cancel(); }, |
412 sync: true | 492 sync: true |
413 ); | 493 ); |
414 } | 494 } |
415 return controller.stream; | 495 return controller.stream; |
416 } | 496 } |
417 | 497 |
(...skipping 38 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
456 * | 536 * |
457 * The returned stream is a broadcast stream if this stream is. | 537 * The returned stream is a broadcast stream if this stream is. |
458 * If a broadcast stream is listened to more than once, each subscription | 538 * If a broadcast stream is listened to more than once, each subscription |
459 * will individually call `convert` and expand the events. | 539 * will individually call `convert` and expand the events. |
460 */ | 540 */ |
461 Stream/*<S>*/ expand/*<S>*/(Iterable/*<S>*/ convert(T value)) { | 541 Stream/*<S>*/ expand/*<S>*/(Iterable/*<S>*/ convert(T value)) { |
462 return new _ExpandStream<T, dynamic/*=S*/>(this, convert); | 542 return new _ExpandStream<T, dynamic/*=S*/>(this, convert); |
463 } | 543 } |
464 | 544 |
465 /** | 545 /** |
466 * Binds this stream as the input of the provided [StreamConsumer]. | 546 * Pipe the events of this stream into [streamConsumer]. |
467 * | 547 * |
468 * The `streamConsumer` is closed when the stream has been added to it. | 548 * The events of this stream are added to `streamConsumer` using |
| 549 * [StreamConsumer.addStream]. |
| 550 * The `streamConsumer` is closed when this stream has been successfully added |
| 551 * to it - when the future returned by `addStream` completes without an error. |
469 * | 552 * |
470 * Returns a future which completes when the stream has been consumed | 553 * Returns a future which completes when the stream has been consumed |
471 * and the consumer has been closed. | 554 * and the consumer has been closed. |
| 555 * |
| 556 * The returned future completes with the same result as the future returned |
| 557 * by [StreamConsumer.close]. |
| 558 * If the adding of the stream itself fails in some way, |
| 559 * then the consumer is expected to be closed, and won't be closed again. |
| 560 * In that case the returned future completes with the error from calling |
| 561 * `addStream`. |
472 */ | 562 */ |
473 Future pipe(StreamConsumer<T> streamConsumer) { | 563 Future pipe(StreamConsumer<T> streamConsumer) { |
474 return streamConsumer.addStream(this).then((_) => streamConsumer.close()); | 564 return streamConsumer.addStream(this).then((_) => streamConsumer.close()); |
475 } | 565 } |
476 | 566 |
477 /** | 567 /** |
478 * Chains this stream as the input of the provided [StreamTransformer]. | 568 * Chains this stream as the input of the provided [StreamTransformer]. |
479 * | 569 * |
480 * Returns the result of [:streamTransformer.bind:] itself. | 570 * Returns the result of [:streamTransformer.bind:] itself. |
481 * | 571 * |
(...skipping 37 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
519 } | 609 } |
520 }, | 610 }, |
521 cancelOnError: true | 611 cancelOnError: true |
522 ); | 612 ); |
523 return result; | 613 return result; |
524 } | 614 } |
525 | 615 |
526 /** Reduces a sequence of values by repeatedly applying [combine]. */ | 616 /** Reduces a sequence of values by repeatedly applying [combine]. */ |
527 Future/*<S>*/ fold/*<S>*/(var/*=S*/ initialValue, | 617 Future/*<S>*/ fold/*<S>*/(var/*=S*/ initialValue, |
528 /*=S*/ combine(var/*=S*/ previous, T element)) { | 618 /*=S*/ combine(var/*=S*/ previous, T element)) { |
529 _Future/*<S>*/ result = new _Future(); | 619 |
530 var value = initialValue; | 620 _Future/*<S>*/ result = new _Future/*<S>*/(); |
| 621 var/*=S*/ value = initialValue; |
531 StreamSubscription subscription; | 622 StreamSubscription subscription; |
532 subscription = this.listen( | 623 subscription = this.listen( |
533 (T element) { | 624 (T element) { |
534 _runUserCode( | 625 _runUserCode( |
535 () => combine(value, element), | 626 () => combine(value, element), |
536 (newValue) { value = newValue; }, | 627 (/*=S*/ newValue) { value = newValue; }, |
537 _cancelAndErrorClosure(subscription, result) | 628 _cancelAndErrorClosure(subscription, result) |
538 ); | 629 ); |
539 }, | 630 }, |
540 onError: (e, st) { | 631 onError: (e, st) { |
541 result._completeError(e, st); | 632 result._completeError(e, st); |
542 }, | 633 }, |
543 onDone: () { | 634 onDone: () { |
544 result._complete(value); | 635 result._complete(value); |
545 }, | 636 }, |
546 cancelOnError: true); | 637 cancelOnError: true); |
(...skipping 241 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
788 * Discards all data on the stream, but signals when it's done or an error | 879 * Discards all data on the stream, but signals when it's done or an error |
789 * occured. | 880 * occured. |
790 * | 881 * |
791 * When subscribing using [drain], cancelOnError will be true. This means | 882 * When subscribing using [drain], cancelOnError will be true. This means |
792 * that the future will complete with the first error on the stream and then | 883 * that the future will complete with the first error on the stream and then |
793 * cancel the subscription. | 884 * cancel the subscription. |
794 * | 885 * |
795 * In case of a `done` event the future completes with the given | 886 * In case of a `done` event the future completes with the given |
796 * [futureValue]. | 887 * [futureValue]. |
797 */ | 888 */ |
798 Future drain([var futureValue]) => listen(null, cancelOnError: true) | 889 Future/*<E>*/ drain/*<E>*/([/*=E*/ futureValue]) |
799 .asFuture(futureValue); | 890 => listen(null, cancelOnError: true).asFuture/*<E>*/(futureValue); |
800 | 891 |
801 /** | 892 /** |
802 * Provides at most the first [n] values of this stream. | 893 * Provides at most the first [n] values of this stream. |
803 * | 894 * |
804 * Forwards the first [n] data events of this stream, and all error | 895 * Forwards the first [n] data events of this stream, and all error |
805 * events, to the returned stream, and ends with a done event. | 896 * events, to the returned stream, and ends with a done event. |
806 * | 897 * |
807 * If this stream produces fewer than [count] values before it's done, | 898 * If this stream produces fewer than [count] values before it's done, |
808 * so will the returned stream. | 899 * so will the returned stream. |
809 * | 900 * |
810 * Stops listening to the stream after the first [n] elements have been | 901 * Stops listening to the stream after the first [n] elements have been |
811 * received. | 902 * received. |
812 * | 903 * |
813 * Internally the method cancels its subscription after these elements. This | 904 * Internally the method cancels its subscription after these elements. This |
814 * means that single-subscription (non-broadcast) streams are closed and | 905 * means that single-subscription (non-broadcast) streams are closed and |
815 * cannot be reused after a call to this method. | 906 * cannot be reused after a call to this method. |
816 * | 907 * |
817 * The returned stream is a broadcast stream if this stream is. | 908 * The returned stream is a broadcast stream if this stream is. |
818 * For a broadcast stream, the events are only counted from the time | 909 * For a broadcast stream, the events are only counted from the time |
819 * the returned stream is listened to. | 910 * the returned stream is listened to. |
820 */ | 911 */ |
821 Stream<T> take(int count) { | 912 Stream<T> take(int count) { |
822 return new _TakeStream(this, count); | 913 return new _TakeStream<T>(this, count); |
823 } | 914 } |
824 | 915 |
825 /** | 916 /** |
826 * Forwards data events while [test] is successful. | 917 * Forwards data events while [test] is successful. |
827 * | 918 * |
828 * The returned stream provides the same events as this stream as long | 919 * The returned stream provides the same events as this stream as long |
829 * as [test] returns [:true:] for the event data. The stream is done | 920 * as [test] returns [:true:] for the event data. The stream is done |
830 * when either this stream is done, or when this stream first provides | 921 * when either this stream is done, or when this stream first provides |
831 * a value that [test] doesn't accept. | 922 * a value that [test] doesn't accept. |
832 * | 923 * |
833 * Stops listening to the stream after the accepted elements. | 924 * Stops listening to the stream after the accepted elements. |
834 * | 925 * |
835 * Internally the method cancels its subscription after these elements. This | 926 * Internally the method cancels its subscription after these elements. This |
836 * means that single-subscription (non-broadcast) streams are closed and | 927 * means that single-subscription (non-broadcast) streams are closed and |
837 * cannot be reused after a call to this method. | 928 * cannot be reused after a call to this method. |
838 * | 929 * |
839 * The returned stream is a broadcast stream if this stream is. | 930 * The returned stream is a broadcast stream if this stream is. |
840 * For a broadcast stream, the events are only tested from the time | 931 * For a broadcast stream, the events are only tested from the time |
841 * the returned stream is listened to. | 932 * the returned stream is listened to. |
842 */ | 933 */ |
843 Stream<T> takeWhile(bool test(T element)) { | 934 Stream<T> takeWhile(bool test(T element)) { |
844 return new _TakeWhileStream(this, test); | 935 return new _TakeWhileStream<T>(this, test); |
845 } | 936 } |
846 | 937 |
847 /** | 938 /** |
848 * Skips the first [count] data events from this stream. | 939 * Skips the first [count] data events from this stream. |
849 * | 940 * |
850 * The returned stream is a broadcast stream if this stream is. | 941 * The returned stream is a broadcast stream if this stream is. |
851 * For a broadcast stream, the events are only counted from the time | 942 * For a broadcast stream, the events are only counted from the time |
852 * the returned stream is listened to. | 943 * the returned stream is listened to. |
853 */ | 944 */ |
854 Stream<T> skip(int count) { | 945 Stream<T> skip(int count) { |
855 return new _SkipStream(this, count); | 946 return new _SkipStream<T>(this, count); |
856 } | 947 } |
857 | 948 |
858 /** | 949 /** |
859 * Skip data events from this stream while they are matched by [test]. | 950 * Skip data events from this stream while they are matched by [test]. |
860 * | 951 * |
861 * Error and done events are provided by the returned stream unmodified. | 952 * Error and done events are provided by the returned stream unmodified. |
862 * | 953 * |
863 * Starting with the first data event where [test] returns false for the | 954 * Starting with the first data event where [test] returns false for the |
864 * event data, the returned stream will have the same events as this stream. | 955 * event data, the returned stream will have the same events as this stream. |
865 * | 956 * |
866 * The returned stream is a broadcast stream if this stream is. | 957 * The returned stream is a broadcast stream if this stream is. |
867 * For a broadcast stream, the events are only tested from the time | 958 * For a broadcast stream, the events are only tested from the time |
868 * the returned stream is listened to. | 959 * the returned stream is listened to. |
869 */ | 960 */ |
870 Stream<T> skipWhile(bool test(T element)) { | 961 Stream<T> skipWhile(bool test(T element)) { |
871 return new _SkipWhileStream(this, test); | 962 return new _SkipWhileStream<T>(this, test); |
872 } | 963 } |
873 | 964 |
874 /** | 965 /** |
875 * Skips data events if they are equal to the previous data event. | 966 * Skips data events if they are equal to the previous data event. |
876 * | 967 * |
877 * The returned stream provides the same events as this stream, except | 968 * The returned stream provides the same events as this stream, except |
878 * that it never provides two consequtive data events that are equal. | 969 * that it never provides two consecutive data events that are equal. |
879 * | 970 * |
880 * Equality is determined by the provided [equals] method. If that is | 971 * Equality is determined by the provided [equals] method. If that is |
881 * omitted, the '==' operator on the last provided data element is used. | 972 * omitted, the '==' operator on the last provided data element is used. |
882 * | 973 * |
883 * The returned stream is a broadcast stream if this stream is. | 974 * The returned stream is a broadcast stream if this stream is. |
884 * If a broadcast stream is listened to more than once, each subscription | 975 * If a broadcast stream is listened to more than once, each subscription |
885 * will individually perform the `equals` test. | 976 * will individually perform the `equals` test. |
886 */ | 977 */ |
887 Stream<T> distinct([bool equals(T previous, T next)]) { | 978 Stream<T> distinct([bool equals(T previous, T next)]) { |
888 return new _DistinctStream(this, equals); | 979 return new _DistinctStream<T>(this, equals); |
889 } | 980 } |
890 | 981 |
891 /** | 982 /** |
892 * Returns the first element of the stream. | 983 * Returns the first element of the stream. |
893 * | 984 * |
894 * Stops listening to the stream after the first element has been received. | 985 * Stops listening to the stream after the first element has been received. |
895 * | 986 * |
896 * Internally the method cancels its subscription after the first element. | 987 * Internally the method cancels its subscription after the first element. |
897 * This means that single-subscription (non-broadcast) streams are closed | 988 * This means that single-subscription (non-broadcast) streams are closed |
898 * and cannot be reused after a call to this getter. | 989 * and cannot be reused after a call to this getter. |
(...skipping 32 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
931 * If an error event occurs before the first data event, the resulting future | 1022 * If an error event occurs before the first data event, the resulting future |
932 * is completed with that error. | 1023 * is completed with that error. |
933 * | 1024 * |
934 * If this stream is empty (a done event occurs before the first data event), | 1025 * If this stream is empty (a done event occurs before the first data event), |
935 * the resulting future completes with a [StateError]. | 1026 * the resulting future completes with a [StateError]. |
936 */ | 1027 */ |
937 Future<T> get last { | 1028 Future<T> get last { |
938 _Future<T> future = new _Future<T>(); | 1029 _Future<T> future = new _Future<T>(); |
939 T result = null; | 1030 T result = null; |
940 bool foundResult = false; | 1031 bool foundResult = false; |
941 StreamSubscription subscription; | 1032 listen( |
942 subscription = this.listen( | |
943 (T value) { | 1033 (T value) { |
944 foundResult = true; | 1034 foundResult = true; |
945 result = value; | 1035 result = value; |
946 }, | 1036 }, |
947 onError: future._completeError, | 1037 onError: future._completeError, |
948 onDone: () { | 1038 onDone: () { |
949 if (foundResult) { | 1039 if (foundResult) { |
950 future._complete(result); | 1040 future._complete(result); |
951 return; | 1041 return; |
952 } | 1042 } |
(...skipping 248 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
1201 * This `EventSink` is only valid during the call to `onTimeout`. | 1291 * This `EventSink` is only valid during the call to `onTimeout`. |
1202 * | 1292 * |
1203 * If `onTimeout` is omitted, a timeout will just put a [TimeoutException] | 1293 * If `onTimeout` is omitted, a timeout will just put a [TimeoutException] |
1204 * into the error channel of the returned stream. | 1294 * into the error channel of the returned stream. |
1205 * | 1295 * |
1206 * The returned stream is a broadcast stream if this stream is. | 1296 * The returned stream is a broadcast stream if this stream is. |
1207 * If a broadcast stream is listened to more than once, each subscription | 1297 * If a broadcast stream is listened to more than once, each subscription |
1208 * will have its individually timer that starts counting on listen, | 1298 * will have its individually timer that starts counting on listen, |
1209 * and the subscriptions' timers can be paused individually. | 1299 * and the subscriptions' timers can be paused individually. |
1210 */ | 1300 */ |
1211 Stream timeout(Duration timeLimit, {void onTimeout(EventSink sink)}) { | 1301 Stream<T> timeout(Duration timeLimit, {void onTimeout(EventSink<T> sink)}) { |
1212 StreamController controller; | 1302 StreamController<T> controller; |
1213 // The following variables are set on listen. | 1303 // The following variables are set on listen. |
1214 StreamSubscription<T> subscription; | 1304 StreamSubscription<T> subscription; |
1215 Timer timer; | 1305 Timer timer; |
1216 Zone zone; | 1306 Zone zone; |
1217 Function timeout2; | 1307 _TimerCallback timeout; |
1218 | 1308 |
1219 void onData(T event) { | 1309 void onData(T event) { |
1220 timer.cancel(); | 1310 timer.cancel(); |
1221 controller.add(event); | 1311 controller.add(event); |
1222 timer = zone.createTimer(timeLimit, timeout2); | 1312 timer = zone.createTimer(timeLimit, timeout); |
1223 } | 1313 } |
1224 void onError(error, StackTrace stackTrace) { | 1314 void onError(error, StackTrace stackTrace) { |
1225 timer.cancel(); | 1315 timer.cancel(); |
1226 assert(controller is _StreamController || | 1316 assert(controller is _StreamController || |
1227 controller is _BroadcastStreamController); | 1317 controller is _BroadcastStreamController); |
1228 var eventSink = controller as _EventSink<T>; | 1318 dynamic eventSink = controller; |
1229 eventSink._addError(error, stackTrace); // Avoid Zone error replacement. | 1319 eventSink._addError(error, stackTrace); // Avoid Zone error replacement. |
1230 timer = zone.createTimer(timeLimit, timeout2); | 1320 timer = zone.createTimer(timeLimit, timeout); |
1231 } | 1321 } |
1232 void onDone() { | 1322 void onDone() { |
1233 timer.cancel(); | 1323 timer.cancel(); |
1234 controller.close(); | 1324 controller.close(); |
1235 } | 1325 } |
1236 void onListen() { | 1326 void onListen() { |
1237 // This is the onListen callback for of controller. | 1327 // This is the onListen callback for of controller. |
1238 // It runs in the same zone that the subscription was created in. | 1328 // It runs in the same zone that the subscription was created in. |
1239 // Use that zone for creating timers and running the onTimeout | 1329 // Use that zone for creating timers and running the onTimeout |
1240 // callback. | 1330 // callback. |
1241 zone = Zone.current; | 1331 zone = Zone.current; |
1242 if (onTimeout == null) { | 1332 if (onTimeout == null) { |
1243 timeout2 = () { | 1333 timeout = () { |
1244 controller.addError(new TimeoutException("No stream event", | 1334 controller.addError(new TimeoutException("No stream event", |
1245 timeLimit), null); | 1335 timeLimit), null); |
1246 }; | 1336 }; |
1247 } else { | 1337 } else { |
1248 onTimeout = zone.registerUnaryCallback(onTimeout); | 1338 // TODO(floitsch): the return type should be 'void', and the type |
| 1339 // should be inferred. |
| 1340 var registeredOnTimeout = |
| 1341 zone.registerUnaryCallback/*<dynamic, EventSink<T>>*/(onTimeout); |
1249 _ControllerEventSinkWrapper wrapper = | 1342 _ControllerEventSinkWrapper wrapper = |
1250 new _ControllerEventSinkWrapper(null); | 1343 new _ControllerEventSinkWrapper(null); |
1251 timeout2 = () { | 1344 timeout = () { |
1252 wrapper._sink = controller; // Only valid during call. | 1345 wrapper._sink = controller; // Only valid during call. |
1253 zone.runUnaryGuarded(onTimeout, wrapper); | 1346 zone.runUnaryGuarded(registeredOnTimeout, wrapper); |
1254 wrapper._sink = null; | 1347 wrapper._sink = null; |
1255 }; | 1348 }; |
1256 } | 1349 } |
1257 | 1350 |
1258 subscription = this.listen(onData, onError: onError, onDone: onDone); | 1351 subscription = this.listen(onData, onError: onError, onDone: onDone); |
1259 timer = zone.createTimer(timeLimit, timeout2); | 1352 timer = zone.createTimer(timeLimit, timeout); |
1260 } | 1353 } |
1261 Future onCancel() { | 1354 Future onCancel() { |
1262 timer.cancel(); | 1355 timer.cancel(); |
1263 Future result = subscription.cancel(); | 1356 Future result = subscription.cancel(); |
1264 subscription = null; | 1357 subscription = null; |
1265 return result; | 1358 return result; |
1266 } | 1359 } |
1267 controller = isBroadcast | 1360 controller = isBroadcast |
1268 ? new _SyncBroadcastStreamController(onListen, onCancel) | 1361 ? new _SyncBroadcastStreamController<T>(onListen, onCancel) |
1269 : new _SyncStreamController( | 1362 : new _SyncStreamController<T>( |
1270 onListen, | 1363 onListen, |
1271 () { | 1364 () { |
1272 // Don't null the timer, onCancel may call cancel again. | 1365 // Don't null the timer, onCancel may call cancel again. |
1273 timer.cancel(); | 1366 timer.cancel(); |
1274 subscription.pause(); | 1367 subscription.pause(); |
1275 }, | 1368 }, |
1276 () { | 1369 () { |
1277 subscription.resume(); | 1370 subscription.resume(); |
1278 timer = zone.createTimer(timeLimit, timeout2); | 1371 timer = zone.createTimer(timeLimit, timeout); |
1279 }, | 1372 }, |
1280 onCancel); | 1373 onCancel); |
1281 return controller.stream; | 1374 return controller.stream; |
1282 } | 1375 } |
1283 } | 1376 } |
1284 | 1377 |
1285 /** | 1378 /** |
1286 * A subscritption on events from a [Stream]. | 1379 * A subscription on events from a [Stream]. |
1287 * | 1380 * |
1288 * When you listen on a [Stream] using [Stream.listen], | 1381 * When you listen on a [Stream] using [Stream.listen], |
1289 * a [StreamSubscription] object is returned. | 1382 * a [StreamSubscription] object is returned. |
1290 * | 1383 * |
1291 * The subscription provides events to the listener, | 1384 * The subscription provides events to the listener, |
1292 * and holds the callbacks used to handle the events. | 1385 * and holds the callbacks used to handle the events. |
1293 * The subscription can also be used to unsubscribe from the events, | 1386 * The subscription can also be used to unsubscribe from the events, |
1294 * or to temporarily pause the events from the stream. | 1387 * or to temporarily pause the events from the stream. |
1295 */ | 1388 */ |
1296 abstract class StreamSubscription<T> { | 1389 abstract class StreamSubscription<T> { |
1297 /** | 1390 /** |
1298 * Cancels this subscription. It will no longer receive events. | 1391 * Cancels this subscription. |
1299 * | 1392 * |
1300 * May return a future which completes when the stream is done cleaning up. | 1393 * After this call, the subscription no longer receives events. |
1301 * This can be used if the stream needs to release some resources | |
1302 * that are needed for a following operation, | |
1303 * for example a file being read, that should be deleted afterwards. | |
1304 * In that case, the file may not be able to be deleted successfully | |
1305 * until the returned future has completed. | |
1306 * | 1394 * |
1307 * The future will be completed with a `null` value. | 1395 * The stream may need to shut down the source of events and clean up after |
| 1396 * the subscription is canceled. |
| 1397 * |
| 1398 * Returns a future that is completed once the stream has finished |
| 1399 * its cleanup. May also return `null` if no cleanup was necessary. |
| 1400 * |
| 1401 * Typically, futures are returned when the stream needs to release resources. |
| 1402 * For example, a stream might need to close an open file (as an asynchronous |
| 1403 * operation). If the listener wants to delete the file after having |
| 1404 * canceled the subscription, it must wait for the cleanup future to complete. |
| 1405 * |
| 1406 * A returned future completes with a `null` value. |
1308 * If the cleanup throws, which it really shouldn't, the returned future | 1407 * If the cleanup throws, which it really shouldn't, the returned future |
1309 * will be completed with that error. | 1408 * completes with that error. |
1310 * | |
1311 * Returns `null` if there is no need to wait. | |
1312 */ | 1409 */ |
1313 Future cancel(); | 1410 Future cancel(); |
1314 | 1411 |
1315 /** | 1412 /** |
1316 * Set or override the data event handler of this subscription. | 1413 * Set or override the data event handler of this subscription. |
1317 * | 1414 * |
1318 * This method overrides the handler that has been set at the invocation of | 1415 * This method overrides the handler that has been set at the invocation of |
1319 * [Stream.listen]. | 1416 * [Stream.listen]. |
1320 */ | 1417 */ |
1321 void onData(void handleData(T data)); | 1418 void onData(void handleData(T data)); |
(...skipping 56 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
1378 * | 1475 * |
1379 * This method *overwrites* the existing [onDone] and [onError] callbacks | 1476 * This method *overwrites* the existing [onDone] and [onError] callbacks |
1380 * with new ones that complete the returned future. | 1477 * with new ones that complete the returned future. |
1381 * | 1478 * |
1382 * In case of an error the subscription will automatically cancel (even | 1479 * In case of an error the subscription will automatically cancel (even |
1383 * when it was listening with `cancelOnError` set to `false`). | 1480 * when it was listening with `cancelOnError` set to `false`). |
1384 * | 1481 * |
1385 * In case of a `done` event the future completes with the given | 1482 * In case of a `done` event the future completes with the given |
1386 * [futureValue]. | 1483 * [futureValue]. |
1387 */ | 1484 */ |
1388 Future asFuture([var futureValue]); | 1485 Future/*<E>*/ asFuture/*<E>*/([var/*=E*/ futureValue]); |
1389 } | 1486 } |
1390 | 1487 |
1391 | 1488 |
1392 /** | 1489 /** |
1393 * An interface that abstracts creation or handling of [Stream] events. | 1490 * An interface that abstracts creation or handling of [Stream] events. |
1394 */ | 1491 */ |
1395 abstract class EventSink<T> implements Sink<T> { | 1492 abstract class EventSink<T> implements Sink<T> { |
1396 /** Send a data event to a stream. */ | 1493 /** Send a data event to a stream. */ |
1397 void add(T event); | 1494 void add(T event); |
1398 | 1495 |
1399 /** Send an async error to a stream. */ | 1496 /** Send an async error to a stream. */ |
1400 void addError(errorEvent, [StackTrace stackTrace]); | 1497 void addError(errorEvent, [StackTrace stackTrace]); |
1401 | 1498 |
1402 /** Close the sink. No further events can be added after closing. */ | 1499 /** Close the sink. No further events can be added after closing. */ |
1403 void close(); | 1500 void close(); |
1404 } | 1501 } |
1405 | 1502 |
1406 | 1503 |
1407 /** [Stream] wrapper that only exposes the [Stream] interface. */ | 1504 /** [Stream] wrapper that only exposes the [Stream] interface. */ |
1408 class StreamView<T> extends Stream<T> { | 1505 class StreamView<T> extends Stream<T> { |
1409 Stream<T> _stream; | 1506 final Stream<T> _stream; |
1410 | 1507 |
1411 StreamView(this._stream); | 1508 const StreamView(Stream<T> stream) : _stream = stream, super._internal(); |
1412 | 1509 |
1413 bool get isBroadcast => _stream.isBroadcast; | 1510 bool get isBroadcast => _stream.isBroadcast; |
1414 | 1511 |
1415 Stream<T> asBroadcastStream({void onListen(StreamSubscription<T> subscription)
, | 1512 Stream<T> asBroadcastStream( |
1416 void onCancel(StreamSubscription<T> subscription)
}) | 1513 {void onListen(StreamSubscription<T> subscription), |
| 1514 void onCancel(StreamSubscription<T> subscription)}) |
1417 => _stream.asBroadcastStream(onListen: onListen, onCancel: onCancel); | 1515 => _stream.asBroadcastStream(onListen: onListen, onCancel: onCancel); |
1418 | 1516 |
1419 StreamSubscription<T> listen(void onData(T value), | 1517 StreamSubscription<T> listen(void onData(T value), |
1420 { Function onError, | 1518 { Function onError, |
1421 void onDone(), | 1519 void onDone(), |
1422 bool cancelOnError }) { | 1520 bool cancelOnError }) { |
1423 return _stream.listen(onData, onError: onError, onDone: onDone, | 1521 return _stream.listen(onData, onError: onError, onDone: onDone, |
1424 cancelOnError: cancelOnError); | 1522 cancelOnError: cancelOnError); |
1425 } | 1523 } |
1426 } | 1524 } |
1427 | 1525 |
1428 | 1526 |
1429 /** | 1527 /** |
1430 * The target of a [Stream.pipe] call. | 1528 * Abstract interface for a "sink" accepting multiple entire streams. |
1431 * | 1529 * |
1432 * The [Stream.pipe] call will pass itself to this object, and then return | 1530 * A consumer can accept a number of consecutive streams using [addStream], |
1433 * the resulting [Future]. The pipe should complete the future when it's | 1531 * and when no further data need to be added, the [close] method tells the |
1434 * done. | 1532 * consumer to complete its work and shut down. |
| 1533 * |
| 1534 * This class is not just a [Sink<Stream>] because it is also combined with |
| 1535 * other [Sink] classes, like it's combined with [EventSink] in the |
| 1536 * [StreamSink] class. |
| 1537 * |
| 1538 * The [Stream.pipe] accepts a `StreamConsumer` and will pass the stream |
| 1539 * to the consumer's [addStream] method. When that completes, it will |
| 1540 * call [close] and then complete its own returned future. |
1435 */ | 1541 */ |
1436 abstract class StreamConsumer<S> { | 1542 abstract class StreamConsumer<S> { |
1437 /** | 1543 /** |
1438 * Consumes the elements of [stream]. | 1544 * Consumes the elements of [stream]. |
1439 * | 1545 * |
1440 * Listens on [stream] and does something for each event. | 1546 * Listens on [stream] and does something for each event. |
1441 * | 1547 * |
1442 * The consumer may stop listening after an error, or it may consume | 1548 * Returns a future which is completed when the stream is done being added, |
1443 * all the errors and only stop at a done event. | 1549 * and the consumer is ready to accept a new stream. |
| 1550 * No further calls to [addStream] or [close] should happen before the |
| 1551 * returned future has completed. |
| 1552 * |
| 1553 * The consumer may stop listening to the stream after an error, |
| 1554 * it may consume all the errors and only stop at a done event, |
| 1555 * or it may be canceled early if the receiver don't want any further events. |
| 1556 * |
| 1557 * If the consumer stops listening because of some error preventing it |
| 1558 * from continuing, it may report this error in the returned future, |
| 1559 * otherwise it will just complete the future with `null`. |
1444 */ | 1560 */ |
1445 Future addStream(Stream<S> stream); | 1561 Future addStream(Stream<S> stream); |
1446 | 1562 |
1447 /** | 1563 /** |
1448 * Tell the consumer that no futher streams will be added. | 1564 * Tells the consumer that no further streams will be added. |
1449 * | 1565 * |
1450 * Returns a future that is completed when the consumer is done handling | 1566 * This allows the consumer to complete any remaining work and release |
1451 * events. | 1567 * resources that are no longer needed |
| 1568 * |
| 1569 * Returns a future which is completed when the consumer has shut down. |
| 1570 * If cleaning up can fail, the error may be reported in the returned future, |
| 1571 * otherwise it completes with `null`. |
1452 */ | 1572 */ |
1453 Future close(); | 1573 Future close(); |
1454 } | 1574 } |
1455 | 1575 |
1456 | 1576 |
1457 /** | 1577 /** |
| 1578 * A object that accepts stream events both synchronously and asynchronously. |
| 1579 * |
1458 * A [StreamSink] unifies the asynchronous methods from [StreamConsumer] and | 1580 * A [StreamSink] unifies the asynchronous methods from [StreamConsumer] and |
1459 * the synchronous methods from [EventSink]. | 1581 * the synchronous methods from [EventSink]. |
1460 * | 1582 * |
1461 * The [EventSink] methods can't be used while the [addStream] is called. | 1583 * The [EventSink] methods can't be used while the [addStream] is called. |
1462 * As soon as the [addStream]'s [Future] completes with a value, the | 1584 * As soon as the [addStream]'s [Future] completes with a value, the |
1463 * [EventSink] methods can be used again. | 1585 * [EventSink] methods can be used again. |
1464 * | 1586 * |
1465 * If [addStream] is called after any of the [EventSink] methods, it'll | 1587 * If [addStream] is called after any of the [EventSink] methods, it'll |
1466 * be delayed until the underlying system has consumed the data added by the | 1588 * be delayed until the underlying system has consumed the data added by the |
1467 * [EventSink] methods. | 1589 * [EventSink] methods. |
1468 * | 1590 * |
1469 * When [EventSink] methods are used, the [done] [Future] can be used to | 1591 * When [EventSink] methods are used, the [done] [Future] can be used to |
1470 * catch any errors. | 1592 * catch any errors. |
1471 * | 1593 * |
1472 * When [close] is called, it will return the [done] [Future]. | 1594 * When [close] is called, it will return the [done] [Future]. |
1473 */ | 1595 */ |
1474 abstract class StreamSink<S> implements StreamConsumer<S>, EventSink<S> { | 1596 abstract class StreamSink<S> implements EventSink<S>, StreamConsumer<S> { |
1475 /** | 1597 /** |
1476 * As [EventSink.close], but returns a future. | 1598 * Tells the stream sink that no further streams will be added. |
| 1599 * |
| 1600 * This allows the stream sink to complete any remaining work and release |
| 1601 * resources that are no longer needed |
| 1602 * |
| 1603 * Returns a future which is completed when the stream sink has shut down. |
| 1604 * If cleaning up can fail, the error may be reported in the returned future, |
| 1605 * otherwise it completes with `null`. |
1477 * | 1606 * |
1478 * Returns the same future as [done]. | 1607 * Returns the same future as [done]. |
| 1608 * |
| 1609 * The stream sink may close before the [close] method is called, either due |
| 1610 * to an error or because it is itself provding events to someone who has |
| 1611 * stopped listening. In that case, the [done] future is completed first, |
| 1612 * and the `close` method will return the `done` future when called. |
| 1613 * |
| 1614 * Unifies [StreamConsumer.close] and [EventSink.close] which both mark their |
| 1615 * object as not expecting any further events. |
1479 */ | 1616 */ |
1480 Future close(); | 1617 Future close(); |
1481 | 1618 |
1482 /** | 1619 /** |
1483 * Return a future which is completed when the [StreamSink] is finished. | 1620 * Return a future which is completed when the [StreamSink] is finished. |
1484 * | 1621 * |
1485 * If the `StreamSink` fails with an error, | 1622 * If the `StreamSink` fails with an error, |
1486 * perhaps in response to adding events using [add], [addError] or [close], | 1623 * perhaps in response to adding events using [add], [addError] or [close], |
1487 * the [done] future will complete with that error. | 1624 * the [done] future will complete with that error. |
1488 * | 1625 * |
(...skipping 65 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
1554 * onListen: () { | 1691 * onListen: () { |
1555 * subscription = input.listen((data) { | 1692 * subscription = input.listen((data) { |
1556 * // Duplicate the data. | 1693 * // Duplicate the data. |
1557 * controller.add(data); | 1694 * controller.add(data); |
1558 * controller.add(data); | 1695 * controller.add(data); |
1559 * }, | 1696 * }, |
1560 * onError: controller.addError, | 1697 * onError: controller.addError, |
1561 * onDone: controller.close, | 1698 * onDone: controller.close, |
1562 * cancelOnError: cancelOnError); | 1699 * cancelOnError: cancelOnError); |
1563 * }, | 1700 * }, |
1564 * onPause: subscription.pause, | 1701 * onPause: () { subscription.pause(); }, |
1565 * onResume: subscription.resume, | 1702 * onResume: () { subscription.resume(); }, |
1566 * onCancel: subscription.cancel, | 1703 * onCancel: () { subscription.cancel(); }, |
1567 * sync: true); | 1704 * sync: true); |
1568 * return controller.stream.listen(null); | 1705 * return controller.stream.listen(null); |
1569 * }); | 1706 * }); |
1570 */ | 1707 */ |
1571 const factory StreamTransformer( | 1708 const factory StreamTransformer( |
1572 StreamSubscription<T> transformer(Stream<S> stream, bool cancelOnError)) | 1709 StreamSubscription<T> transformer(Stream<S> stream, bool cancelOnError)) |
1573 = _StreamSubscriptionTransformer<S, T>; | 1710 = _StreamSubscriptionTransformer<S, T>; |
1574 | 1711 |
1575 /** | 1712 /** |
1576 * Creates a [StreamTransformer] that delegates events to the given functions. | 1713 * Creates a [StreamTransformer] that delegates events to the given functions. |
1577 * | 1714 * |
1578 * Example use of a duplicating transformer: | 1715 * Example use of a duplicating transformer: |
1579 * | 1716 * |
1580 * stringStream.transform(new StreamTransformer<String, String>.fromHandle
rs( | 1717 * stringStream.transform(new StreamTransformer<String, String>.fromHandle
rs( |
1581 * handleData: (String value, EventSink<String> sink) { | 1718 * handleData: (String value, EventSink<String> sink) { |
1582 * sink.add(value); | 1719 * sink.add(value); |
1583 * sink.add(value); // Duplicate the incoming events. | 1720 * sink.add(value); // Duplicate the incoming events. |
1584 * })); | 1721 * })); |
1585 */ | 1722 */ |
1586 factory StreamTransformer.fromHandlers({ | 1723 factory StreamTransformer.fromHandlers({ |
1587 void handleData(S data, EventSink<T> sink), | 1724 void handleData(S data, EventSink<T> sink), |
1588 void handleError(Object error, StackTrace stackTrace, EventSink<T> sink), | 1725 void handleError(Object error, StackTrace stackTrace, EventSink<T> sink), |
1589 void handleDone(EventSink<T> sink)}) | 1726 void handleDone(EventSink<T> sink)}) |
1590 = _StreamHandlerTransformer<S, T>; | 1727 = _StreamHandlerTransformer<S, T>; |
1591 | 1728 |
1592 /** | 1729 /** |
1593 * Transform the incoming [stream]'s events. | 1730 * Transform the incoming [stream]'s events. |
1594 * | 1731 * |
1595 * Creates a new stream. | 1732 * Creates a new stream. |
1596 * When this stream is listened to, it will start listening on [stream], | 1733 * When this stream is listened to, it will start listening on [stream], |
1597 * and generate events on the new stream based on the events from [stream]. | 1734 * and generate events on the new stream based on the events from [stream]. |
1598 * | 1735 * |
1599 * Subscriptions on the returned stream should propagate pause state | 1736 * Subscriptions on the returned stream should propagate pause state |
1600 * to the subscription on [stream]. | 1737 * to the subscription on [stream]. |
1601 */ | 1738 */ |
1602 Stream<T> bind(Stream<S> stream); | 1739 Stream<T> bind(Stream<S> stream); |
1603 } | 1740 } |
1604 | 1741 |
1605 /** | 1742 /** |
1606 * An [Iterable] like interface for the values of a [Stream]. | 1743 * An [Iterator] like interface for the values of a [Stream]. |
1607 * | 1744 * |
1608 * This wraps a [Stream] and a subscription on the stream. It listens | 1745 * This wraps a [Stream] and a subscription on the stream. It listens |
1609 * on the stream, and completes the future returned by [moveNext] when the | 1746 * on the stream, and completes the future returned by [moveNext] when the |
1610 * next value becomes available. | 1747 * next value becomes available. |
1611 */ | 1748 */ |
1612 abstract class StreamIterator<T> { | 1749 abstract class StreamIterator<T> { |
1613 | 1750 |
1614 /** Create a [StreamIterator] on [stream]. */ | 1751 /** Create a [StreamIterator] on [stream]. */ |
1615 factory StreamIterator(Stream<T> stream) | 1752 factory StreamIterator(Stream<T> stream) |
1616 // TODO(lrn): use redirecting factory constructor when type | 1753 // TODO(lrn): use redirecting factory constructor when type |
1617 // arguments are supported. | 1754 // arguments are supported. |
1618 => new _StreamIteratorImpl<T>(stream); | 1755 => new _StreamIteratorImpl<T>(stream); |
1619 | 1756 |
1620 /** | 1757 /** |
1621 * Wait for the next stream value to be available. | 1758 * Wait for the next stream value to be available. |
1622 * | 1759 * |
1623 * It is not allowed to call this function again until the future has | 1760 * Returns a future which will complete with either `true` or `false`. |
1624 * completed. If the returned future completes with anything except `true`, | 1761 * Completing with `true` means that another event has been received and |
1625 * the iterator is done, and no new value will ever be available. | 1762 * can be read as [current]. |
| 1763 * Completing with `false` means that the stream itearation is done and |
| 1764 * no further events will ever be available. |
| 1765 * The future may complete with an error, if the stream produces an error, |
| 1766 * which also ends iteration. |
1626 * | 1767 * |
1627 * The future may complete with an error, if the stream produces an error. | 1768 * The function must not be called again until the future returned by a |
| 1769 * previous call is completed. |
1628 */ | 1770 */ |
1629 Future<bool> moveNext(); | 1771 Future<bool> moveNext(); |
1630 | 1772 |
1631 /** | 1773 /** |
1632 * The current value of the stream. | 1774 * The current value of the stream. |
1633 * | 1775 * |
1634 * Only valid when the future returned by [moveNext] completes with `true` | 1776 * Is `null` before the first call to [moveNext] and after a call to |
1635 * as value, and only until the next call to [moveNext]. | 1777 * `moveNext` completes with a `false` result or an error. |
| 1778 * |
| 1779 * When a `moveNext` call completes with `true`, the `current` field holds |
| 1780 * the most recent event of the stream, and it stays like that until the next |
| 1781 * call to `moveNext`. |
| 1782 * Between a call to `moveNext` and when its returned future completes, |
| 1783 * the value is unspecified. |
1636 */ | 1784 */ |
1637 T get current; | 1785 T get current; |
1638 | 1786 |
1639 /** | 1787 /** |
1640 * Cancels the stream iterator (and the underlying stream subscription) early. | 1788 * Cancels the stream iterator (and the underlying stream subscription) early. |
1641 * | 1789 * |
1642 * The stream iterator is automatically canceled if the [moveNext] future | 1790 * The stream iterator is automatically canceled if the [moveNext] future |
1643 * completes with either `false` or an error. | 1791 * completes with either `false` or an error. |
1644 * | 1792 * |
1645 * If a [moveNext] call has been made, it will complete with `false` as value, | |
1646 * as will all further calls to [moveNext]. | |
1647 * | |
1648 * If you need to stop listening for values before the stream iterator is | 1793 * If you need to stop listening for values before the stream iterator is |
1649 * automatically closed, you must call [cancel] to ensure that the stream | 1794 * automatically closed, you must call [cancel] to ensure that the stream |
1650 * is properly closed. | 1795 * is properly closed. |
1651 * | 1796 * |
| 1797 * If [moveNext] has been called when the iterator is cancelled, |
| 1798 * its returned future will complete with `false` as value, |
| 1799 * as will all further calls to [moveNext]. |
| 1800 * |
1652 * Returns a future if the cancel-operation is not completed synchronously. | 1801 * Returns a future if the cancel-operation is not completed synchronously. |
1653 * Otherwise returns `null`. | 1802 * Otherwise returns `null`. |
1654 */ | 1803 */ |
1655 Future cancel(); | 1804 Future cancel(); |
1656 } | 1805 } |
1657 | 1806 |
1658 | 1807 |
1659 /** | 1808 /** |
1660 * Wraps an [_EventSink] so it exposes only the [EventSink] interface. | 1809 * Wraps an [_EventSink] so it exposes only the [EventSink] interface. |
1661 */ | 1810 */ |
1662 class _ControllerEventSinkWrapper<T> implements EventSink<T> { | 1811 class _ControllerEventSinkWrapper<T> implements EventSink<T> { |
1663 EventSink _sink; | 1812 EventSink _sink; |
1664 _ControllerEventSinkWrapper(this._sink); | 1813 _ControllerEventSinkWrapper(this._sink); |
1665 | 1814 |
1666 void add(T data) { _sink.add(data); } | 1815 void add(T data) { _sink.add(data); } |
1667 void addError(error, [StackTrace stackTrace]) { | 1816 void addError(error, [StackTrace stackTrace]) { |
1668 _sink.addError(error, stackTrace); | 1817 _sink.addError(error, stackTrace); |
1669 } | 1818 } |
1670 void close() { _sink.close(); } | 1819 void close() { _sink.close(); } |
1671 } | 1820 } |
OLD | NEW |