Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(51)

Side by Side Diff: tool/input_sdk/lib/async/stream.dart

Issue 1953153002: Update dart:async to match the Dart repo. (Closed) Base URL: https://github.com/dart-lang/dev_compiler.git@master
Patch Set: Remove unneeded calls. Created 4 years, 7 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
1 // Copyright (c) 2013, the Dart project authors. Please see the AUTHORS file 1 // Copyright (c) 2013, the Dart project authors. Please see the AUTHORS file
2 // for details. All rights reserved. Use of this source code is governed by a 2 // for details. All rights reserved. Use of this source code is governed by a
3 // BSD-style license that can be found in the LICENSE file. 3 // BSD-style license that can be found in the LICENSE file.
4 4
5 part of dart.async; 5 part of dart.async;
6 6
7 // ------------------------------------------------------------------- 7 // -------------------------------------------------------------------
8 // Core Stream types 8 // Core Stream types
9 // ------------------------------------------------------------------- 9 // -------------------------------------------------------------------
10 10
11 typedef void _TimerCallback();
12
11 /** 13 /**
12 * A source of asynchronous data events. 14 * A source of asynchronous data events.
13 * 15 *
14 * A Stream provides a way to receive a sequence of events. 16 * A Stream provides a way to receive a sequence of events.
15 * Each event is either a data event or an error event, 17 * Each event is either a data event or an error event,
16 * representing the result of a single computation. 18 * representing the result of a single computation.
17 * When the events provided by a Stream have all been sent, 19 * When the events provided by a Stream have all been sent,
18 * a single "done" event will mark the end. 20 * a single "done" event will mark the end.
19 * 21 *
20 * You can [listen] on a stream to make it start generating events, 22 * You can [listen] on a stream to make it start generating events,
(...skipping 20 matching lines...) Expand all
41 * 43 *
42 * *A broadcast stream* allows any number of listeners, and it fires 44 * *A broadcast stream* allows any number of listeners, and it fires
43 * its events when they are ready, whether there are listeners or not. 45 * its events when they are ready, whether there are listeners or not.
44 * 46 *
45 * Broadcast streams are used for independent events/observers. 47 * Broadcast streams are used for independent events/observers.
46 * 48 *
47 * If several listeners want to listen to a single subscription stream, 49 * If several listeners want to listen to a single subscription stream,
48 * use [asBroadcastStream] to create a broadcast stream on top of the 50 * use [asBroadcastStream] to create a broadcast stream on top of the
49 * non-broadcast stream. 51 * non-broadcast stream.
50 * 52 *
51 * On either kind of stream, stream transformationss, such as [where] and 53 * On either kind of stream, stream transformations, such as [where] and
52 * [skip], return the same type of stream as the one the method was called on, 54 * [skip], return the same type of stream as the one the method was called on,
53 * unless otherwise noted. 55 * unless otherwise noted.
54 * 56 *
55 * When an event is fired, the listener(s) at that time will receive the event. 57 * When an event is fired, the listener(s) at that time will receive the event.
56 * If a listener is added to a broadcast stream while an event is being fired, 58 * If a listener is added to a broadcast stream while an event is being fired,
57 * that listener will not receive the event currently being fired. 59 * that listener will not receive the event currently being fired.
58 * If a listener is canceled, it immediately stops receiving events. 60 * If a listener is canceled, it immediately stops receiving events.
59 * 61 *
60 * When the "done" event is fired, subscribers are unsubscribed before 62 * When the "done" event is fired, subscribers are unsubscribed before
61 * receiving the event. After the event has been sent, the stream has no 63 * receiving the event. After the event has been sent, the stream has no
62 * subscribers. Adding new subscribers to a broadcast stream after this point 64 * subscribers. Adding new subscribers to a broadcast stream after this point
63 * is allowed, but they will just receive a new "done" event as soon 65 * is allowed, but they will just receive a new "done" event as soon
64 * as possible. 66 * as possible.
65 * 67 *
66 * Stream subscriptions always respect "pause" requests. If necessary they need 68 * Stream subscriptions always respect "pause" requests. If necessary they need
67 * to buffer their input, but often, and preferably, they can simply request 69 * to buffer their input, but often, and preferably, they can simply request
68 * their input to pause too. 70 * their input to pause too.
69 * 71 *
70 * The default implementation of [isBroadcast] returns false. 72 * The default implementation of [isBroadcast] returns false.
71 * A broadcast stream inheriting from [Stream] must override [isBroadcast] 73 * A broadcast stream inheriting from [Stream] must override [isBroadcast]
72 * to return `true`. 74 * to return `true`.
73 */ 75 */
74 abstract class Stream<T> { 76 abstract class Stream<T> {
75 Stream(); 77 Stream();
76 78
77 /** 79 /**
80 * Internal use only. We do not want to promise that Stream stays const.
81 *
82 * If mixins become compatible with const constructors, we may use a
83 * stream mixin instead of extending Stream from a const class.
84 */
85 const Stream._internal();
86
87 /**
88 * Creates an empty broadcast stream.
89 *
90 * This is a stream which does nothing except sending a done event
91 * when it's listened to.
92 */
93 const factory Stream.empty() = _EmptyStream<T>;
94
95 /**
78 * Creates a new single-subscription stream from the future. 96 * Creates a new single-subscription stream from the future.
79 * 97 *
80 * When the future completes, the stream will fire one event, either 98 * When the future completes, the stream will fire one event, either
81 * data or error, and then close with a done-event. 99 * data or error, and then close with a done-event.
82 */ 100 */
83 factory Stream.fromFuture(Future<T> future) { 101 factory Stream.fromFuture(Future<T> future) {
84 // Use the controller's buffering to fill in the value even before 102 // Use the controller's buffering to fill in the value even before
85 // the stream has a listener. For a single value, it's not worth it 103 // the stream has a listener. For a single value, it's not worth it
86 // to wait for a listener before doing the `then` on the future. 104 // to wait for a listener before doing the `then` on the future.
87 _StreamController<T> controller = 105 _StreamController<T> controller = new StreamController<T>(sync: true);
88 new StreamController<T>(sync: true) as _StreamController<T>;
89 future.then((value) { 106 future.then((value) {
90 controller._add(value); 107 controller._add(value);
91 controller._closeUnchecked(); 108 controller._closeUnchecked();
92 }, 109 },
93 onError: (error, stackTrace) { 110 onError: (error, stackTrace) {
94 controller._addError(error, stackTrace); 111 controller._addError(error, stackTrace);
95 controller._closeUnchecked(); 112 controller._closeUnchecked();
96 }); 113 });
97 return controller.stream; 114 return controller.stream;
98 } 115 }
99 116
100 /** 117 /**
118 * Create a stream from a group of futures.
119 *
120 * The stream reports the results of the futures on the stream in the order
121 * in which the futures complete.
122 *
123 * If some futures have completed before calling `Stream.fromFutures`,
124 * their result will be output on the created stream in some unspecified
125 * order.
126 *
127 * When all futures have completed, the stream is closed.
128 *
129 * If no future is passed, the stream closes as soon as possible.
130 */
131 factory Stream.fromFutures(Iterable<Future<T>> futures) {
132 _StreamController<T> controller = new StreamController<T>(sync: true);
133 int count = 0;
134 var onValue = (T value) {
135 if (!controller.isClosed) {
136 controller._add(value);
137 if (--count == 0) controller._closeUnchecked();
138 }
139 };
140 var onError = (error, stack) {
141 if (!controller.isClosed) {
142 controller._addError(error, stack);
143 if (--count == 0) controller._closeUnchecked();
144 }
145 };
146 // The futures are already running, so start listening to them immediately
147 // (instead of waiting for the stream to be listened on).
148 // If we wait, we might not catch errors in the futures in time.
149 for (var future in futures) {
150 count++;
151 future.then(onValue, onError: onError);
152 }
153 // Use schedule microtask since controller is sync.
154 if (count == 0) scheduleMicrotask(controller.close);
155 return controller.stream;
156 }
157
158 /**
101 * Creates a single-subscription stream that gets its data from [data]. 159 * Creates a single-subscription stream that gets its data from [data].
102 * 160 *
103 * The iterable is iterated when the stream receives a listener, and stops 161 * The iterable is iterated when the stream receives a listener, and stops
104 * iterating if the listener cancels the subscription. 162 * iterating if the listener cancels the subscription.
105 * 163 *
106 * If iterating [data] throws an error, the stream ends immediately with 164 * If iterating [data] throws an error, the stream ends immediately with
107 * that error. No done event will be sent (iteration is not complete), but no 165 * that error. No done event will be sent (iteration is not complete), but no
108 * further data events will be generated either, since iteration cannot 166 * further data events will be generated either, since iteration cannot
109 * continue. 167 * continue.
110 */ 168 */
111 factory Stream.fromIterable(Iterable<T> data) { 169 factory Stream.fromIterable(Iterable<T> data) {
112 return new _GeneratedStreamImpl<T>( 170 return new _GeneratedStreamImpl<T>(
113 () => new _IterablePendingEvents<T>(data)); 171 () => new _IterablePendingEvents<T>(data));
114 } 172 }
115 173
116 /** 174 /**
117 * Creates a stream that repeatedly emits events at [period] intervals. 175 * Creates a stream that repeatedly emits events at [period] intervals.
118 * 176 *
119 * The event values are computed by invoking [computation]. The argument to 177 * The event values are computed by invoking [computation]. The argument to
120 * this callback is an integer that starts with 0 and is incremented for 178 * this callback is an integer that starts with 0 and is incremented for
121 * every event. 179 * every event.
122 * 180 *
123 * If [computation] is omitted the event values will all be `null`. 181 * If [computation] is omitted the event values will all be `null`.
124 */ 182 */
125 factory Stream.periodic(Duration period, 183 factory Stream.periodic(Duration period,
126 [T computation(int computationCount)]) { 184 [T computation(int computationCount)]) {
127 if (computation == null) computation = ((i) => null);
128
129 Timer timer; 185 Timer timer;
130 int computationCount = 0; 186 int computationCount = 0;
131 StreamController<T> controller; 187 StreamController<T> controller;
132 // Counts the time that the Stream was running (and not paused). 188 // Counts the time that the Stream was running (and not paused).
133 Stopwatch watch = new Stopwatch(); 189 Stopwatch watch = new Stopwatch();
134 190
135 void sendEvent() { 191 void sendEvent() {
136 watch.reset(); 192 watch.reset();
137 T data = computation(computationCount++); 193 T data;
194 if (computation != null) {
195 try {
196 data = computation(computationCount++);
197 } catch (e, s) {
198 controller.addError(e, s);
199 return;
200 }
201 }
138 controller.add(data); 202 controller.add(data);
139 } 203 }
140 204
141 void startPeriodicTimer() { 205 void startPeriodicTimer() {
142 assert(timer == null); 206 assert(timer == null);
143 timer = new Timer.periodic(period, (Timer timer) { 207 timer = new Timer.periodic(period, (Timer timer) {
144 sendEvent(); 208 sendEvent();
145 }); 209 });
146 } 210 }
147 211
(...skipping 42 matching lines...) Expand 10 before | Expand all | Expand 10 after
190 * 254 *
191 * class DuplicationSink implements EventSink<String> { 255 * class DuplicationSink implements EventSink<String> {
192 * final EventSink<String> _outputSink; 256 * final EventSink<String> _outputSink;
193 * DuplicationSink(this._outputSink); 257 * DuplicationSink(this._outputSink);
194 * 258 *
195 * void add(String data) { 259 * void add(String data) {
196 * _outputSink.add(data); 260 * _outputSink.add(data);
197 * _outputSink.add(data); 261 * _outputSink.add(data);
198 * } 262 * }
199 * 263 *
200 * void addError(e, [st]) => _outputSink(e, st); 264 * void addError(e, [st]) { _outputSink.addError(e, st); }
201 * void close() => _outputSink.close(); 265 * void close() { _outputSink.close(); }
202 * } 266 * }
203 * 267 *
204 * class DuplicationTransformer implements StreamTransformer<String, Strin g> { 268 * class DuplicationTransformer implements StreamTransformer<String, Strin g> {
205 * // Some generic types ommitted for brevety. 269 * // Some generic types ommitted for brevety.
206 * Stream bind(Stream stream) => new Stream<String>.eventTransform( 270 * Stream bind(Stream stream) => new Stream<String>.eventTransformed(
207 * stream, 271 * stream,
208 * (EventSink sink) => new DuplicationSink(sink)); 272 * (EventSink sink) => new DuplicationSink(sink));
209 * } 273 * }
210 * 274 *
211 * stringStream.transform(new DuplicationTransformer()); 275 * stringStream.transform(new DuplicationTransformer());
212 * 276 *
213 * The resulting stream is a broadcast stream if [source] is. 277 * The resulting stream is a broadcast stream if [source] is.
214 */ 278 */
215 factory Stream.eventTransformed(Stream source, 279 factory Stream.eventTransformed(Stream source,
216 EventSink mapSink(EventSink<T> sink)) { 280 EventSink mapSink(EventSink<T> sink)) {
(...skipping 39 matching lines...) Expand 10 before | Expand all | Expand 10 after
256 * is called. If [onData] is null, nothing happens. 320 * is called. If [onData] is null, nothing happens.
257 * 321 *
258 * On errors from this stream, the [onError] handler is given a 322 * On errors from this stream, the [onError] handler is given a
259 * object describing the error. 323 * object describing the error.
260 * 324 *
261 * The [onError] callback must be of type `void onError(error)` or 325 * The [onError] callback must be of type `void onError(error)` or
262 * `void onError(error, StackTrace stackTrace)`. If [onError] accepts 326 * `void onError(error, StackTrace stackTrace)`. If [onError] accepts
263 * two arguments it is called with the stack trace (which could be `null` if 327 * two arguments it is called with the stack trace (which could be `null` if
264 * the stream itself received an error without stack trace). 328 * the stream itself received an error without stack trace).
265 * Otherwise it is called with just the error object. 329 * Otherwise it is called with just the error object.
330 * If [onError] is omitted, any errors on the stream are considered unhandled,
331 * and will be passed to the current [Zone]'s error handler.
332 * By default unhandled async errors are treated
333 * as if they were uncaught top-level errors.
266 * 334 *
267 * If this stream closes, the [onDone] handler is called. 335 * If this stream closes, the [onDone] handler is called.
268 * 336 *
269 * If [cancelOnError] is true, the subscription is ended when 337 * If [cancelOnError] is true, the subscription is ended when
270 * the first error is reported. The default is false. 338 * the first error is reported. The default is false.
271 */ 339 */
272 StreamSubscription<T> listen(void onData(T event), 340 StreamSubscription<T> listen(void onData(T event),
273 { Function onError, 341 { Function onError,
274 void onDone(), 342 void onDone(),
275 bool cancelOnError}); 343 bool cancelOnError});
276 344
277 /** 345 /**
278 * Creates a new stream from this stream that discards some data events. 346 * Creates a new stream from this stream that discards some data events.
279 * 347 *
280 * The new stream sends the same error and done events as this stream, 348 * The new stream sends the same error and done events as this stream,
281 * but it only sends the data events that satisfy the [test]. 349 * but it only sends the data events that satisfy the [test].
282 * 350 *
283 * The returned stream is a broadcast stream if this stream is. 351 * The returned stream is a broadcast stream if this stream is.
284 * If a broadcast stream is listened to more than once, each subscription 352 * If a broadcast stream is listened to more than once, each subscription
285 * will individually perform the `test`. 353 * will individually perform the `test`.
286 */ 354 */
287 Stream<T> where(bool test(T event)) { 355 Stream<T> where(bool test(T event)) {
288 return new _WhereStream<T>(this, test); 356 return new _WhereStream<T>(this, test);
289 } 357 }
290 358
291 /** 359 /**
292 * Creates a new stream that converts each element of this stream 360 * Creates a new stream that converts each element of this stream
293 * to a new value using the [convert] function. 361 * to a new value using the [convert] function.
294 * 362 *
363 * For each data event, `o`, in this stream, the returned stream
364 * provides a data event with the value `convert(o)`.
365 * If [convert] throws, the returned stream reports the exception as an error
366 * event instead.
367 *
368 * Error and done events are passed through unchanged to the returned stream.
369 *
295 * The returned stream is a broadcast stream if this stream is. 370 * The returned stream is a broadcast stream if this stream is.
371 * The [convert] function is called once per data event per listener.
296 * If a broadcast stream is listened to more than once, each subscription 372 * If a broadcast stream is listened to more than once, each subscription
297 * will individually execute `map` for each event. 373 * will individually call [convert] on each data event.
298 */ 374 */
299 Stream/*<S>*/ map/*<S>*/(/*=S*/ convert(T event)) { 375 Stream/*<S>*/ map/*<S>*/(/*=S*/ convert(T event)) {
300 return new _MapStream<T, dynamic/*=S*/>(this, convert); 376 return new _MapStream<T, dynamic/*=S*/>(this, convert);
301 } 377 }
302 378
303 /** 379 /**
304 * Creates a new stream with each data event of this stream asynchronously 380 * Creates a new stream with each data event of this stream asynchronously
305 * mapped to a new event. 381 * mapped to a new event.
306 * 382 *
307 * This acts like [map], except that [convert] may return a [Future], 383 * This acts like [map], except that [convert] may return a [Future],
308 * and in that case, the stream waits for that future to complete before 384 * and in that case, the stream waits for that future to complete before
309 * continuing with its result. 385 * continuing with its result.
310 * 386 *
311 * The returned stream is a broadcast stream if this stream is. 387 * The returned stream is a broadcast stream if this stream is.
312 */ 388 */
313 Stream asyncMap(convert(T event)) { 389 Stream/*<E>*/ asyncMap/*<E>*/(convert(T event)) {
314 StreamController controller; 390 StreamController/*<E>*/ controller;
315 StreamSubscription subscription; 391 StreamSubscription/*<T>*/ subscription;
316 void onListen () { 392
393 void onListen() {
317 final add = controller.add; 394 final add = controller.add;
318 assert(controller is _StreamController || 395 assert(controller is _StreamController ||
319 controller is _BroadcastStreamController); 396 controller is _BroadcastStreamController);
320 final eventSink = controller as _EventSink<T>; 397 final _EventSink/*<E>*/ eventSink =
398 controller as Object /*=_EventSink<E>*/;
321 final addError = eventSink._addError; 399 final addError = eventSink._addError;
322 subscription = this.listen( 400 subscription = this.listen(
323 (T event) { 401 (T event) {
324 var newValue; 402 dynamic newValue;
325 try { 403 try {
326 newValue = convert(event); 404 newValue = convert(event);
327 } catch (e, s) { 405 } catch (e, s) {
328 controller.addError(e, s); 406 controller.addError(e, s);
329 return; 407 return;
330 } 408 }
331 if (newValue is Future) { 409 if (newValue is Future) {
332 subscription.pause(); 410 subscription.pause();
333 newValue.then(add, onError: addError) 411 newValue.then(add, onError: addError)
334 .whenComplete(subscription.resume); 412 .whenComplete(subscription.resume);
335 } else { 413 } else {
336 controller.add(newValue); 414 controller.add(newValue as Object/*=E*/);
337 } 415 }
338 }, 416 },
339 onError: addError, 417 onError: addError,
340 onDone: controller.close 418 onDone: controller.close
341 ); 419 );
342 } 420 }
421
343 if (this.isBroadcast) { 422 if (this.isBroadcast) {
344 controller = new StreamController.broadcast( 423 controller = new StreamController/*<E>*/.broadcast(
345 onListen: onListen, 424 onListen: onListen,
346 onCancel: () { subscription.cancel(); }, 425 onCancel: () { subscription.cancel(); },
347 sync: true 426 sync: true
348 ); 427 );
349 } else { 428 } else {
350 controller = new StreamController( 429 controller = new StreamController/*<E>*/(
351 onListen: onListen, 430 onListen: onListen,
352 onPause: () { subscription.pause(); }, 431 onPause: () { subscription.pause(); },
353 onResume: () { subscription.resume(); }, 432 onResume: () { subscription.resume(); },
354 onCancel: () { subscription.cancel(); }, 433 onCancel: () { subscription.cancel(); },
355 sync: true 434 sync: true
356 ); 435 );
357 } 436 }
358 return controller.stream; 437 return controller.stream;
359 } 438 }
360 439
361 /** 440 /**
362 * Creates a new stream with the events of a stream per original event. 441 * Creates a new stream with the events of a stream per original event.
363 * 442 *
364 * This acts like [expand], except that [convert] returns a [Stream] 443 * This acts like [expand], except that [convert] returns a [Stream]
365 * instead of an [Iterable]. 444 * instead of an [Iterable].
366 * The events of the returned stream becomes the events of the returned 445 * The events of the returned stream becomes the events of the returned
367 * stream, in the order they are produced. 446 * stream, in the order they are produced.
368 * 447 *
369 * If [convert] returns `null`, no value is put on the output stream, 448 * If [convert] returns `null`, no value is put on the output stream,
370 * just as if it returned an empty stream. 449 * just as if it returned an empty stream.
371 * 450 *
372 * The returned stream is a broadcast stream if this stream is. 451 * The returned stream is a broadcast stream if this stream is.
373 */ 452 */
374 Stream asyncExpand(Stream convert(T event)) { 453 Stream/*<E>*/ asyncExpand/*<E>*/(Stream/*<E>*/ convert(T event)) {
375 StreamController controller; 454 StreamController/*<E>*/ controller;
376 StreamSubscription subscription; 455 StreamSubscription<T> subscription;
377 void onListen() { 456 void onListen() {
378 assert(controller is _StreamController || 457 assert(controller is _StreamController ||
379 controller is _BroadcastStreamController); 458 controller is _BroadcastStreamController);
380 final eventSink = controller as _EventSink<T>; 459 final _EventSink/*<E>*/ eventSink =
460 controller as Object /*=_EventSink<E>*/;
381 subscription = this.listen( 461 subscription = this.listen(
382 (T event) { 462 (T event) {
383 Stream newStream; 463 Stream/*<E>*/ newStream;
384 try { 464 try {
385 newStream = convert(event); 465 newStream = convert(event);
386 } catch (e, s) { 466 } catch (e, s) {
387 controller.addError(e, s); 467 controller.addError(e, s);
388 return; 468 return;
389 } 469 }
390 if (newStream != null) { 470 if (newStream != null) {
391 subscription.pause(); 471 subscription.pause();
392 controller.addStream(newStream) 472 controller.addStream(newStream)
393 .whenComplete(subscription.resume); 473 .whenComplete(subscription.resume);
394 } 474 }
395 }, 475 },
396 onError: eventSink._addError, // Avoid Zone error replacement. 476 onError: eventSink._addError, // Avoid Zone error replacement.
397 onDone: controller.close 477 onDone: controller.close
398 ); 478 );
399 } 479 }
400 if (this.isBroadcast) { 480 if (this.isBroadcast) {
401 controller = new StreamController.broadcast( 481 controller = new StreamController/*<E>*/.broadcast(
402 onListen: onListen, 482 onListen: onListen,
403 onCancel: () { subscription.cancel(); }, 483 onCancel: () { subscription.cancel(); },
404 sync: true 484 sync: true
405 ); 485 );
406 } else { 486 } else {
407 controller = new StreamController( 487 controller = new StreamController/*<E>*/(
408 onListen: onListen, 488 onListen: onListen,
409 onPause: () { subscription.pause(); }, 489 onPause: () { subscription.pause(); },
410 onResume: () { subscription.resume(); }, 490 onResume: () { subscription.resume(); },
411 onCancel: () { subscription.cancel(); }, 491 onCancel: () { subscription.cancel(); },
412 sync: true 492 sync: true
413 ); 493 );
414 } 494 }
415 return controller.stream; 495 return controller.stream;
416 } 496 }
417 497
(...skipping 38 matching lines...) Expand 10 before | Expand all | Expand 10 after
456 * 536 *
457 * The returned stream is a broadcast stream if this stream is. 537 * The returned stream is a broadcast stream if this stream is.
458 * If a broadcast stream is listened to more than once, each subscription 538 * If a broadcast stream is listened to more than once, each subscription
459 * will individually call `convert` and expand the events. 539 * will individually call `convert` and expand the events.
460 */ 540 */
461 Stream/*<S>*/ expand/*<S>*/(Iterable/*<S>*/ convert(T value)) { 541 Stream/*<S>*/ expand/*<S>*/(Iterable/*<S>*/ convert(T value)) {
462 return new _ExpandStream<T, dynamic/*=S*/>(this, convert); 542 return new _ExpandStream<T, dynamic/*=S*/>(this, convert);
463 } 543 }
464 544
465 /** 545 /**
466 * Binds this stream as the input of the provided [StreamConsumer]. 546 * Pipe the events of this stream into [streamConsumer].
467 * 547 *
468 * The `streamConsumer` is closed when the stream has been added to it. 548 * The events of this stream are added to `streamConsumer` using
549 * [StreamConsumer.addStream].
550 * The `streamConsumer` is closed when this stream has been successfully added
551 * to it - when the future returned by `addStream` completes without an error.
469 * 552 *
470 * Returns a future which completes when the stream has been consumed 553 * Returns a future which completes when the stream has been consumed
471 * and the consumer has been closed. 554 * and the consumer has been closed.
555 *
556 * The returned future completes with the same result as the future returned
557 * by [StreamConsumer.close].
558 * If the adding of the stream itself fails in some way,
559 * then the consumer is expected to be closed, and won't be closed again.
560 * In that case the returned future completes with the error from calling
561 * `addStream`.
472 */ 562 */
473 Future pipe(StreamConsumer<T> streamConsumer) { 563 Future pipe(StreamConsumer<T> streamConsumer) {
474 return streamConsumer.addStream(this).then((_) => streamConsumer.close()); 564 return streamConsumer.addStream(this).then((_) => streamConsumer.close());
475 } 565 }
476 566
477 /** 567 /**
478 * Chains this stream as the input of the provided [StreamTransformer]. 568 * Chains this stream as the input of the provided [StreamTransformer].
479 * 569 *
480 * Returns the result of [:streamTransformer.bind:] itself. 570 * Returns the result of [:streamTransformer.bind:] itself.
481 * 571 *
(...skipping 37 matching lines...) Expand 10 before | Expand all | Expand 10 after
519 } 609 }
520 }, 610 },
521 cancelOnError: true 611 cancelOnError: true
522 ); 612 );
523 return result; 613 return result;
524 } 614 }
525 615
526 /** Reduces a sequence of values by repeatedly applying [combine]. */ 616 /** Reduces a sequence of values by repeatedly applying [combine]. */
527 Future/*<S>*/ fold/*<S>*/(var/*=S*/ initialValue, 617 Future/*<S>*/ fold/*<S>*/(var/*=S*/ initialValue,
528 /*=S*/ combine(var/*=S*/ previous, T element)) { 618 /*=S*/ combine(var/*=S*/ previous, T element)) {
529 _Future/*<S>*/ result = new _Future(); 619
530 var value = initialValue; 620 _Future/*<S>*/ result = new _Future/*<S>*/();
621 var/*=S*/ value = initialValue;
531 StreamSubscription subscription; 622 StreamSubscription subscription;
532 subscription = this.listen( 623 subscription = this.listen(
533 (T element) { 624 (T element) {
534 _runUserCode( 625 _runUserCode(
535 () => combine(value, element), 626 () => combine(value, element),
536 (newValue) { value = newValue; }, 627 (/*=S*/ newValue) { value = newValue; },
537 _cancelAndErrorClosure(subscription, result) 628 _cancelAndErrorClosure(subscription, result)
538 ); 629 );
539 }, 630 },
540 onError: (e, st) { 631 onError: (e, st) {
541 result._completeError(e, st); 632 result._completeError(e, st);
542 }, 633 },
543 onDone: () { 634 onDone: () {
544 result._complete(value); 635 result._complete(value);
545 }, 636 },
546 cancelOnError: true); 637 cancelOnError: true);
(...skipping 241 matching lines...) Expand 10 before | Expand all | Expand 10 after
788 * Discards all data on the stream, but signals when it's done or an error 879 * Discards all data on the stream, but signals when it's done or an error
789 * occured. 880 * occured.
790 * 881 *
791 * When subscribing using [drain], cancelOnError will be true. This means 882 * When subscribing using [drain], cancelOnError will be true. This means
792 * that the future will complete with the first error on the stream and then 883 * that the future will complete with the first error on the stream and then
793 * cancel the subscription. 884 * cancel the subscription.
794 * 885 *
795 * In case of a `done` event the future completes with the given 886 * In case of a `done` event the future completes with the given
796 * [futureValue]. 887 * [futureValue].
797 */ 888 */
798 Future drain([var futureValue]) => listen(null, cancelOnError: true) 889 Future/*<E>*/ drain/*<E>*/([/*=E*/ futureValue])
799 .asFuture(futureValue); 890 => listen(null, cancelOnError: true).asFuture/*<E>*/(futureValue);
800 891
801 /** 892 /**
802 * Provides at most the first [n] values of this stream. 893 * Provides at most the first [n] values of this stream.
803 * 894 *
804 * Forwards the first [n] data events of this stream, and all error 895 * Forwards the first [n] data events of this stream, and all error
805 * events, to the returned stream, and ends with a done event. 896 * events, to the returned stream, and ends with a done event.
806 * 897 *
807 * If this stream produces fewer than [count] values before it's done, 898 * If this stream produces fewer than [count] values before it's done,
808 * so will the returned stream. 899 * so will the returned stream.
809 * 900 *
810 * Stops listening to the stream after the first [n] elements have been 901 * Stops listening to the stream after the first [n] elements have been
811 * received. 902 * received.
812 * 903 *
813 * Internally the method cancels its subscription after these elements. This 904 * Internally the method cancels its subscription after these elements. This
814 * means that single-subscription (non-broadcast) streams are closed and 905 * means that single-subscription (non-broadcast) streams are closed and
815 * cannot be reused after a call to this method. 906 * cannot be reused after a call to this method.
816 * 907 *
817 * The returned stream is a broadcast stream if this stream is. 908 * The returned stream is a broadcast stream if this stream is.
818 * For a broadcast stream, the events are only counted from the time 909 * For a broadcast stream, the events are only counted from the time
819 * the returned stream is listened to. 910 * the returned stream is listened to.
820 */ 911 */
821 Stream<T> take(int count) { 912 Stream<T> take(int count) {
822 return new _TakeStream(this, count); 913 return new _TakeStream<T>(this, count);
823 } 914 }
824 915
825 /** 916 /**
826 * Forwards data events while [test] is successful. 917 * Forwards data events while [test] is successful.
827 * 918 *
828 * The returned stream provides the same events as this stream as long 919 * The returned stream provides the same events as this stream as long
829 * as [test] returns [:true:] for the event data. The stream is done 920 * as [test] returns [:true:] for the event data. The stream is done
830 * when either this stream is done, or when this stream first provides 921 * when either this stream is done, or when this stream first provides
831 * a value that [test] doesn't accept. 922 * a value that [test] doesn't accept.
832 * 923 *
833 * Stops listening to the stream after the accepted elements. 924 * Stops listening to the stream after the accepted elements.
834 * 925 *
835 * Internally the method cancels its subscription after these elements. This 926 * Internally the method cancels its subscription after these elements. This
836 * means that single-subscription (non-broadcast) streams are closed and 927 * means that single-subscription (non-broadcast) streams are closed and
837 * cannot be reused after a call to this method. 928 * cannot be reused after a call to this method.
838 * 929 *
839 * The returned stream is a broadcast stream if this stream is. 930 * The returned stream is a broadcast stream if this stream is.
840 * For a broadcast stream, the events are only tested from the time 931 * For a broadcast stream, the events are only tested from the time
841 * the returned stream is listened to. 932 * the returned stream is listened to.
842 */ 933 */
843 Stream<T> takeWhile(bool test(T element)) { 934 Stream<T> takeWhile(bool test(T element)) {
844 return new _TakeWhileStream(this, test); 935 return new _TakeWhileStream<T>(this, test);
845 } 936 }
846 937
847 /** 938 /**
848 * Skips the first [count] data events from this stream. 939 * Skips the first [count] data events from this stream.
849 * 940 *
850 * The returned stream is a broadcast stream if this stream is. 941 * The returned stream is a broadcast stream if this stream is.
851 * For a broadcast stream, the events are only counted from the time 942 * For a broadcast stream, the events are only counted from the time
852 * the returned stream is listened to. 943 * the returned stream is listened to.
853 */ 944 */
854 Stream<T> skip(int count) { 945 Stream<T> skip(int count) {
855 return new _SkipStream(this, count); 946 return new _SkipStream<T>(this, count);
856 } 947 }
857 948
858 /** 949 /**
859 * Skip data events from this stream while they are matched by [test]. 950 * Skip data events from this stream while they are matched by [test].
860 * 951 *
861 * Error and done events are provided by the returned stream unmodified. 952 * Error and done events are provided by the returned stream unmodified.
862 * 953 *
863 * Starting with the first data event where [test] returns false for the 954 * Starting with the first data event where [test] returns false for the
864 * event data, the returned stream will have the same events as this stream. 955 * event data, the returned stream will have the same events as this stream.
865 * 956 *
866 * The returned stream is a broadcast stream if this stream is. 957 * The returned stream is a broadcast stream if this stream is.
867 * For a broadcast stream, the events are only tested from the time 958 * For a broadcast stream, the events are only tested from the time
868 * the returned stream is listened to. 959 * the returned stream is listened to.
869 */ 960 */
870 Stream<T> skipWhile(bool test(T element)) { 961 Stream<T> skipWhile(bool test(T element)) {
871 return new _SkipWhileStream(this, test); 962 return new _SkipWhileStream<T>(this, test);
872 } 963 }
873 964
874 /** 965 /**
875 * Skips data events if they are equal to the previous data event. 966 * Skips data events if they are equal to the previous data event.
876 * 967 *
877 * The returned stream provides the same events as this stream, except 968 * The returned stream provides the same events as this stream, except
878 * that it never provides two consequtive data events that are equal. 969 * that it never provides two consecutive data events that are equal.
879 * 970 *
880 * Equality is determined by the provided [equals] method. If that is 971 * Equality is determined by the provided [equals] method. If that is
881 * omitted, the '==' operator on the last provided data element is used. 972 * omitted, the '==' operator on the last provided data element is used.
882 * 973 *
883 * The returned stream is a broadcast stream if this stream is. 974 * The returned stream is a broadcast stream if this stream is.
884 * If a broadcast stream is listened to more than once, each subscription 975 * If a broadcast stream is listened to more than once, each subscription
885 * will individually perform the `equals` test. 976 * will individually perform the `equals` test.
886 */ 977 */
887 Stream<T> distinct([bool equals(T previous, T next)]) { 978 Stream<T> distinct([bool equals(T previous, T next)]) {
888 return new _DistinctStream(this, equals); 979 return new _DistinctStream<T>(this, equals);
889 } 980 }
890 981
891 /** 982 /**
892 * Returns the first element of the stream. 983 * Returns the first element of the stream.
893 * 984 *
894 * Stops listening to the stream after the first element has been received. 985 * Stops listening to the stream after the first element has been received.
895 * 986 *
896 * Internally the method cancels its subscription after the first element. 987 * Internally the method cancels its subscription after the first element.
897 * This means that single-subscription (non-broadcast) streams are closed 988 * This means that single-subscription (non-broadcast) streams are closed
898 * and cannot be reused after a call to this getter. 989 * and cannot be reused after a call to this getter.
(...skipping 32 matching lines...) Expand 10 before | Expand all | Expand 10 after
931 * If an error event occurs before the first data event, the resulting future 1022 * If an error event occurs before the first data event, the resulting future
932 * is completed with that error. 1023 * is completed with that error.
933 * 1024 *
934 * If this stream is empty (a done event occurs before the first data event), 1025 * If this stream is empty (a done event occurs before the first data event),
935 * the resulting future completes with a [StateError]. 1026 * the resulting future completes with a [StateError].
936 */ 1027 */
937 Future<T> get last { 1028 Future<T> get last {
938 _Future<T> future = new _Future<T>(); 1029 _Future<T> future = new _Future<T>();
939 T result = null; 1030 T result = null;
940 bool foundResult = false; 1031 bool foundResult = false;
941 StreamSubscription subscription; 1032 listen(
942 subscription = this.listen(
943 (T value) { 1033 (T value) {
944 foundResult = true; 1034 foundResult = true;
945 result = value; 1035 result = value;
946 }, 1036 },
947 onError: future._completeError, 1037 onError: future._completeError,
948 onDone: () { 1038 onDone: () {
949 if (foundResult) { 1039 if (foundResult) {
950 future._complete(result); 1040 future._complete(result);
951 return; 1041 return;
952 } 1042 }
(...skipping 248 matching lines...) Expand 10 before | Expand all | Expand 10 after
1201 * This `EventSink` is only valid during the call to `onTimeout`. 1291 * This `EventSink` is only valid during the call to `onTimeout`.
1202 * 1292 *
1203 * If `onTimeout` is omitted, a timeout will just put a [TimeoutException] 1293 * If `onTimeout` is omitted, a timeout will just put a [TimeoutException]
1204 * into the error channel of the returned stream. 1294 * into the error channel of the returned stream.
1205 * 1295 *
1206 * The returned stream is a broadcast stream if this stream is. 1296 * The returned stream is a broadcast stream if this stream is.
1207 * If a broadcast stream is listened to more than once, each subscription 1297 * If a broadcast stream is listened to more than once, each subscription
1208 * will have its individually timer that starts counting on listen, 1298 * will have its individually timer that starts counting on listen,
1209 * and the subscriptions' timers can be paused individually. 1299 * and the subscriptions' timers can be paused individually.
1210 */ 1300 */
1211 Stream timeout(Duration timeLimit, {void onTimeout(EventSink sink)}) { 1301 Stream<T> timeout(Duration timeLimit, {void onTimeout(EventSink<T> sink)}) {
1212 StreamController controller; 1302 StreamController<T> controller;
1213 // The following variables are set on listen. 1303 // The following variables are set on listen.
1214 StreamSubscription<T> subscription; 1304 StreamSubscription<T> subscription;
1215 Timer timer; 1305 Timer timer;
1216 Zone zone; 1306 Zone zone;
1217 Function timeout2; 1307 _TimerCallback timeout;
1218 1308
1219 void onData(T event) { 1309 void onData(T event) {
1220 timer.cancel(); 1310 timer.cancel();
1221 controller.add(event); 1311 controller.add(event);
1222 timer = zone.createTimer(timeLimit, timeout2); 1312 timer = zone.createTimer(timeLimit, timeout);
1223 } 1313 }
1224 void onError(error, StackTrace stackTrace) { 1314 void onError(error, StackTrace stackTrace) {
1225 timer.cancel(); 1315 timer.cancel();
1226 assert(controller is _StreamController || 1316 assert(controller is _StreamController ||
1227 controller is _BroadcastStreamController); 1317 controller is _BroadcastStreamController);
1228 var eventSink = controller as _EventSink<T>; 1318 dynamic eventSink = controller;
1229 eventSink._addError(error, stackTrace); // Avoid Zone error replacement. 1319 eventSink._addError(error, stackTrace); // Avoid Zone error replacement.
1230 timer = zone.createTimer(timeLimit, timeout2); 1320 timer = zone.createTimer(timeLimit, timeout);
1231 } 1321 }
1232 void onDone() { 1322 void onDone() {
1233 timer.cancel(); 1323 timer.cancel();
1234 controller.close(); 1324 controller.close();
1235 } 1325 }
1236 void onListen() { 1326 void onListen() {
1237 // This is the onListen callback for of controller. 1327 // This is the onListen callback for of controller.
1238 // It runs in the same zone that the subscription was created in. 1328 // It runs in the same zone that the subscription was created in.
1239 // Use that zone for creating timers and running the onTimeout 1329 // Use that zone for creating timers and running the onTimeout
1240 // callback. 1330 // callback.
1241 zone = Zone.current; 1331 zone = Zone.current;
1242 if (onTimeout == null) { 1332 if (onTimeout == null) {
1243 timeout2 = () { 1333 timeout = () {
1244 controller.addError(new TimeoutException("No stream event", 1334 controller.addError(new TimeoutException("No stream event",
1245 timeLimit), null); 1335 timeLimit), null);
1246 }; 1336 };
1247 } else { 1337 } else {
1248 onTimeout = zone.registerUnaryCallback(onTimeout); 1338 // TODO(floitsch): the return type should be 'void', and the type
1339 // should be inferred.
1340 var registeredOnTimeout =
1341 zone.registerUnaryCallback/*<dynamic, EventSink<T>>*/(onTimeout);
1249 _ControllerEventSinkWrapper wrapper = 1342 _ControllerEventSinkWrapper wrapper =
1250 new _ControllerEventSinkWrapper(null); 1343 new _ControllerEventSinkWrapper(null);
1251 timeout2 = () { 1344 timeout = () {
1252 wrapper._sink = controller; // Only valid during call. 1345 wrapper._sink = controller; // Only valid during call.
1253 zone.runUnaryGuarded(onTimeout, wrapper); 1346 zone.runUnaryGuarded(registeredOnTimeout, wrapper);
1254 wrapper._sink = null; 1347 wrapper._sink = null;
1255 }; 1348 };
1256 } 1349 }
1257 1350
1258 subscription = this.listen(onData, onError: onError, onDone: onDone); 1351 subscription = this.listen(onData, onError: onError, onDone: onDone);
1259 timer = zone.createTimer(timeLimit, timeout2); 1352 timer = zone.createTimer(timeLimit, timeout);
1260 } 1353 }
1261 Future onCancel() { 1354 Future onCancel() {
1262 timer.cancel(); 1355 timer.cancel();
1263 Future result = subscription.cancel(); 1356 Future result = subscription.cancel();
1264 subscription = null; 1357 subscription = null;
1265 return result; 1358 return result;
1266 } 1359 }
1267 controller = isBroadcast 1360 controller = isBroadcast
1268 ? new _SyncBroadcastStreamController(onListen, onCancel) 1361 ? new _SyncBroadcastStreamController<T>(onListen, onCancel)
1269 : new _SyncStreamController( 1362 : new _SyncStreamController<T>(
1270 onListen, 1363 onListen,
1271 () { 1364 () {
1272 // Don't null the timer, onCancel may call cancel again. 1365 // Don't null the timer, onCancel may call cancel again.
1273 timer.cancel(); 1366 timer.cancel();
1274 subscription.pause(); 1367 subscription.pause();
1275 }, 1368 },
1276 () { 1369 () {
1277 subscription.resume(); 1370 subscription.resume();
1278 timer = zone.createTimer(timeLimit, timeout2); 1371 timer = zone.createTimer(timeLimit, timeout);
1279 }, 1372 },
1280 onCancel); 1373 onCancel);
1281 return controller.stream; 1374 return controller.stream;
1282 } 1375 }
1283 } 1376 }
1284 1377
1285 /** 1378 /**
1286 * A subscritption on events from a [Stream]. 1379 * A subscription on events from a [Stream].
1287 * 1380 *
1288 * When you listen on a [Stream] using [Stream.listen], 1381 * When you listen on a [Stream] using [Stream.listen],
1289 * a [StreamSubscription] object is returned. 1382 * a [StreamSubscription] object is returned.
1290 * 1383 *
1291 * The subscription provides events to the listener, 1384 * The subscription provides events to the listener,
1292 * and holds the callbacks used to handle the events. 1385 * and holds the callbacks used to handle the events.
1293 * The subscription can also be used to unsubscribe from the events, 1386 * The subscription can also be used to unsubscribe from the events,
1294 * or to temporarily pause the events from the stream. 1387 * or to temporarily pause the events from the stream.
1295 */ 1388 */
1296 abstract class StreamSubscription<T> { 1389 abstract class StreamSubscription<T> {
1297 /** 1390 /**
1298 * Cancels this subscription. It will no longer receive events. 1391 * Cancels this subscription.
1299 * 1392 *
1300 * May return a future which completes when the stream is done cleaning up. 1393 * After this call, the subscription no longer receives events.
1301 * This can be used if the stream needs to release some resources
1302 * that are needed for a following operation,
1303 * for example a file being read, that should be deleted afterwards.
1304 * In that case, the file may not be able to be deleted successfully
1305 * until the returned future has completed.
1306 * 1394 *
1307 * The future will be completed with a `null` value. 1395 * The stream may need to shut down the source of events and clean up after
1396 * the subscription is canceled.
1397 *
1398 * Returns a future that is completed once the stream has finished
1399 * its cleanup. May also return `null` if no cleanup was necessary.
1400 *
1401 * Typically, futures are returned when the stream needs to release resources.
1402 * For example, a stream might need to close an open file (as an asynchronous
1403 * operation). If the listener wants to delete the file after having
1404 * canceled the subscription, it must wait for the cleanup future to complete.
1405 *
1406 * A returned future completes with a `null` value.
1308 * If the cleanup throws, which it really shouldn't, the returned future 1407 * If the cleanup throws, which it really shouldn't, the returned future
1309 * will be completed with that error. 1408 * completes with that error.
1310 *
1311 * Returns `null` if there is no need to wait.
1312 */ 1409 */
1313 Future cancel(); 1410 Future cancel();
1314 1411
1315 /** 1412 /**
1316 * Set or override the data event handler of this subscription. 1413 * Set or override the data event handler of this subscription.
1317 * 1414 *
1318 * This method overrides the handler that has been set at the invocation of 1415 * This method overrides the handler that has been set at the invocation of
1319 * [Stream.listen]. 1416 * [Stream.listen].
1320 */ 1417 */
1321 void onData(void handleData(T data)); 1418 void onData(void handleData(T data));
(...skipping 56 matching lines...) Expand 10 before | Expand all | Expand 10 after
1378 * 1475 *
1379 * This method *overwrites* the existing [onDone] and [onError] callbacks 1476 * This method *overwrites* the existing [onDone] and [onError] callbacks
1380 * with new ones that complete the returned future. 1477 * with new ones that complete the returned future.
1381 * 1478 *
1382 * In case of an error the subscription will automatically cancel (even 1479 * In case of an error the subscription will automatically cancel (even
1383 * when it was listening with `cancelOnError` set to `false`). 1480 * when it was listening with `cancelOnError` set to `false`).
1384 * 1481 *
1385 * In case of a `done` event the future completes with the given 1482 * In case of a `done` event the future completes with the given
1386 * [futureValue]. 1483 * [futureValue].
1387 */ 1484 */
1388 Future asFuture([var futureValue]); 1485 Future/*<E>*/ asFuture/*<E>*/([var/*=E*/ futureValue]);
1389 } 1486 }
1390 1487
1391 1488
1392 /** 1489 /**
1393 * An interface that abstracts creation or handling of [Stream] events. 1490 * An interface that abstracts creation or handling of [Stream] events.
1394 */ 1491 */
1395 abstract class EventSink<T> implements Sink<T> { 1492 abstract class EventSink<T> implements Sink<T> {
1396 /** Send a data event to a stream. */ 1493 /** Send a data event to a stream. */
1397 void add(T event); 1494 void add(T event);
1398 1495
1399 /** Send an async error to a stream. */ 1496 /** Send an async error to a stream. */
1400 void addError(errorEvent, [StackTrace stackTrace]); 1497 void addError(errorEvent, [StackTrace stackTrace]);
1401 1498
1402 /** Close the sink. No further events can be added after closing. */ 1499 /** Close the sink. No further events can be added after closing. */
1403 void close(); 1500 void close();
1404 } 1501 }
1405 1502
1406 1503
1407 /** [Stream] wrapper that only exposes the [Stream] interface. */ 1504 /** [Stream] wrapper that only exposes the [Stream] interface. */
1408 class StreamView<T> extends Stream<T> { 1505 class StreamView<T> extends Stream<T> {
1409 Stream<T> _stream; 1506 final Stream<T> _stream;
1410 1507
1411 StreamView(this._stream); 1508 const StreamView(Stream<T> stream) : _stream = stream, super._internal();
1412 1509
1413 bool get isBroadcast => _stream.isBroadcast; 1510 bool get isBroadcast => _stream.isBroadcast;
1414 1511
1415 Stream<T> asBroadcastStream({void onListen(StreamSubscription<T> subscription) , 1512 Stream<T> asBroadcastStream(
1416 void onCancel(StreamSubscription<T> subscription) }) 1513 {void onListen(StreamSubscription<T> subscription),
1514 void onCancel(StreamSubscription<T> subscription)})
1417 => _stream.asBroadcastStream(onListen: onListen, onCancel: onCancel); 1515 => _stream.asBroadcastStream(onListen: onListen, onCancel: onCancel);
1418 1516
1419 StreamSubscription<T> listen(void onData(T value), 1517 StreamSubscription<T> listen(void onData(T value),
1420 { Function onError, 1518 { Function onError,
1421 void onDone(), 1519 void onDone(),
1422 bool cancelOnError }) { 1520 bool cancelOnError }) {
1423 return _stream.listen(onData, onError: onError, onDone: onDone, 1521 return _stream.listen(onData, onError: onError, onDone: onDone,
1424 cancelOnError: cancelOnError); 1522 cancelOnError: cancelOnError);
1425 } 1523 }
1426 } 1524 }
1427 1525
1428 1526
1429 /** 1527 /**
1430 * The target of a [Stream.pipe] call. 1528 * Abstract interface for a "sink" accepting multiple entire streams.
1431 * 1529 *
1432 * The [Stream.pipe] call will pass itself to this object, and then return 1530 * A consumer can accept a number of consecutive streams using [addStream],
1433 * the resulting [Future]. The pipe should complete the future when it's 1531 * and when no further data need to be added, the [close] method tells the
1434 * done. 1532 * consumer to complete its work and shut down.
1533 *
1534 * This class is not just a [Sink<Stream>] because it is also combined with
1535 * other [Sink] classes, like it's combined with [EventSink] in the
1536 * [StreamSink] class.
1537 *
1538 * The [Stream.pipe] accepts a `StreamConsumer` and will pass the stream
1539 * to the consumer's [addStream] method. When that completes, it will
1540 * call [close] and then complete its own returned future.
1435 */ 1541 */
1436 abstract class StreamConsumer<S> { 1542 abstract class StreamConsumer<S> {
1437 /** 1543 /**
1438 * Consumes the elements of [stream]. 1544 * Consumes the elements of [stream].
1439 * 1545 *
1440 * Listens on [stream] and does something for each event. 1546 * Listens on [stream] and does something for each event.
1441 * 1547 *
1442 * The consumer may stop listening after an error, or it may consume 1548 * Returns a future which is completed when the stream is done being added,
1443 * all the errors and only stop at a done event. 1549 * and the consumer is ready to accept a new stream.
1550 * No further calls to [addStream] or [close] should happen before the
1551 * returned future has completed.
1552 *
1553 * The consumer may stop listening to the stream after an error,
1554 * it may consume all the errors and only stop at a done event,
1555 * or it may be canceled early if the receiver don't want any further events.
1556 *
1557 * If the consumer stops listening because of some error preventing it
1558 * from continuing, it may report this error in the returned future,
1559 * otherwise it will just complete the future with `null`.
1444 */ 1560 */
1445 Future addStream(Stream<S> stream); 1561 Future addStream(Stream<S> stream);
1446 1562
1447 /** 1563 /**
1448 * Tell the consumer that no futher streams will be added. 1564 * Tells the consumer that no further streams will be added.
1449 * 1565 *
1450 * Returns a future that is completed when the consumer is done handling 1566 * This allows the consumer to complete any remaining work and release
1451 * events. 1567 * resources that are no longer needed
1568 *
1569 * Returns a future which is completed when the consumer has shut down.
1570 * If cleaning up can fail, the error may be reported in the returned future,
1571 * otherwise it completes with `null`.
1452 */ 1572 */
1453 Future close(); 1573 Future close();
1454 } 1574 }
1455 1575
1456 1576
1457 /** 1577 /**
1578 * A object that accepts stream events both synchronously and asynchronously.
1579 *
1458 * A [StreamSink] unifies the asynchronous methods from [StreamConsumer] and 1580 * A [StreamSink] unifies the asynchronous methods from [StreamConsumer] and
1459 * the synchronous methods from [EventSink]. 1581 * the synchronous methods from [EventSink].
1460 * 1582 *
1461 * The [EventSink] methods can't be used while the [addStream] is called. 1583 * The [EventSink] methods can't be used while the [addStream] is called.
1462 * As soon as the [addStream]'s [Future] completes with a value, the 1584 * As soon as the [addStream]'s [Future] completes with a value, the
1463 * [EventSink] methods can be used again. 1585 * [EventSink] methods can be used again.
1464 * 1586 *
1465 * If [addStream] is called after any of the [EventSink] methods, it'll 1587 * If [addStream] is called after any of the [EventSink] methods, it'll
1466 * be delayed until the underlying system has consumed the data added by the 1588 * be delayed until the underlying system has consumed the data added by the
1467 * [EventSink] methods. 1589 * [EventSink] methods.
1468 * 1590 *
1469 * When [EventSink] methods are used, the [done] [Future] can be used to 1591 * When [EventSink] methods are used, the [done] [Future] can be used to
1470 * catch any errors. 1592 * catch any errors.
1471 * 1593 *
1472 * When [close] is called, it will return the [done] [Future]. 1594 * When [close] is called, it will return the [done] [Future].
1473 */ 1595 */
1474 abstract class StreamSink<S> implements StreamConsumer<S>, EventSink<S> { 1596 abstract class StreamSink<S> implements EventSink<S>, StreamConsumer<S> {
1475 /** 1597 /**
1476 * As [EventSink.close], but returns a future. 1598 * Tells the stream sink that no further streams will be added.
1599 *
1600 * This allows the stream sink to complete any remaining work and release
1601 * resources that are no longer needed
1602 *
1603 * Returns a future which is completed when the stream sink has shut down.
1604 * If cleaning up can fail, the error may be reported in the returned future,
1605 * otherwise it completes with `null`.
1477 * 1606 *
1478 * Returns the same future as [done]. 1607 * Returns the same future as [done].
1608 *
1609 * The stream sink may close before the [close] method is called, either due
1610 * to an error or because it is itself provding events to someone who has
1611 * stopped listening. In that case, the [done] future is completed first,
1612 * and the `close` method will return the `done` future when called.
1613 *
1614 * Unifies [StreamConsumer.close] and [EventSink.close] which both mark their
1615 * object as not expecting any further events.
1479 */ 1616 */
1480 Future close(); 1617 Future close();
1481 1618
1482 /** 1619 /**
1483 * Return a future which is completed when the [StreamSink] is finished. 1620 * Return a future which is completed when the [StreamSink] is finished.
1484 * 1621 *
1485 * If the `StreamSink` fails with an error, 1622 * If the `StreamSink` fails with an error,
1486 * perhaps in response to adding events using [add], [addError] or [close], 1623 * perhaps in response to adding events using [add], [addError] or [close],
1487 * the [done] future will complete with that error. 1624 * the [done] future will complete with that error.
1488 * 1625 *
(...skipping 65 matching lines...) Expand 10 before | Expand all | Expand 10 after
1554 * onListen: () { 1691 * onListen: () {
1555 * subscription = input.listen((data) { 1692 * subscription = input.listen((data) {
1556 * // Duplicate the data. 1693 * // Duplicate the data.
1557 * controller.add(data); 1694 * controller.add(data);
1558 * controller.add(data); 1695 * controller.add(data);
1559 * }, 1696 * },
1560 * onError: controller.addError, 1697 * onError: controller.addError,
1561 * onDone: controller.close, 1698 * onDone: controller.close,
1562 * cancelOnError: cancelOnError); 1699 * cancelOnError: cancelOnError);
1563 * }, 1700 * },
1564 * onPause: subscription.pause, 1701 * onPause: () { subscription.pause(); },
1565 * onResume: subscription.resume, 1702 * onResume: () { subscription.resume(); },
1566 * onCancel: subscription.cancel, 1703 * onCancel: () { subscription.cancel(); },
1567 * sync: true); 1704 * sync: true);
1568 * return controller.stream.listen(null); 1705 * return controller.stream.listen(null);
1569 * }); 1706 * });
1570 */ 1707 */
1571 const factory StreamTransformer( 1708 const factory StreamTransformer(
1572 StreamSubscription<T> transformer(Stream<S> stream, bool cancelOnError)) 1709 StreamSubscription<T> transformer(Stream<S> stream, bool cancelOnError))
1573 = _StreamSubscriptionTransformer<S, T>; 1710 = _StreamSubscriptionTransformer<S, T>;
1574 1711
1575 /** 1712 /**
1576 * Creates a [StreamTransformer] that delegates events to the given functions. 1713 * Creates a [StreamTransformer] that delegates events to the given functions.
1577 * 1714 *
1578 * Example use of a duplicating transformer: 1715 * Example use of a duplicating transformer:
1579 * 1716 *
1580 * stringStream.transform(new StreamTransformer<String, String>.fromHandle rs( 1717 * stringStream.transform(new StreamTransformer<String, String>.fromHandle rs(
1581 * handleData: (String value, EventSink<String> sink) { 1718 * handleData: (String value, EventSink<String> sink) {
1582 * sink.add(value); 1719 * sink.add(value);
1583 * sink.add(value); // Duplicate the incoming events. 1720 * sink.add(value); // Duplicate the incoming events.
1584 * })); 1721 * }));
1585 */ 1722 */
1586 factory StreamTransformer.fromHandlers({ 1723 factory StreamTransformer.fromHandlers({
1587 void handleData(S data, EventSink<T> sink), 1724 void handleData(S data, EventSink<T> sink),
1588 void handleError(Object error, StackTrace stackTrace, EventSink<T> sink), 1725 void handleError(Object error, StackTrace stackTrace, EventSink<T> sink),
1589 void handleDone(EventSink<T> sink)}) 1726 void handleDone(EventSink<T> sink)})
1590 = _StreamHandlerTransformer<S, T>; 1727 = _StreamHandlerTransformer<S, T>;
1591 1728
1592 /** 1729 /**
1593 * Transform the incoming [stream]'s events. 1730 * Transform the incoming [stream]'s events.
1594 * 1731 *
1595 * Creates a new stream. 1732 * Creates a new stream.
1596 * When this stream is listened to, it will start listening on [stream], 1733 * When this stream is listened to, it will start listening on [stream],
1597 * and generate events on the new stream based on the events from [stream]. 1734 * and generate events on the new stream based on the events from [stream].
1598 * 1735 *
1599 * Subscriptions on the returned stream should propagate pause state 1736 * Subscriptions on the returned stream should propagate pause state
1600 * to the subscription on [stream]. 1737 * to the subscription on [stream].
1601 */ 1738 */
1602 Stream<T> bind(Stream<S> stream); 1739 Stream<T> bind(Stream<S> stream);
1603 } 1740 }
1604 1741
1605 /** 1742 /**
1606 * An [Iterable] like interface for the values of a [Stream]. 1743 * An [Iterator] like interface for the values of a [Stream].
1607 * 1744 *
1608 * This wraps a [Stream] and a subscription on the stream. It listens 1745 * This wraps a [Stream] and a subscription on the stream. It listens
1609 * on the stream, and completes the future returned by [moveNext] when the 1746 * on the stream, and completes the future returned by [moveNext] when the
1610 * next value becomes available. 1747 * next value becomes available.
1611 */ 1748 */
1612 abstract class StreamIterator<T> { 1749 abstract class StreamIterator<T> {
1613 1750
1614 /** Create a [StreamIterator] on [stream]. */ 1751 /** Create a [StreamIterator] on [stream]. */
1615 factory StreamIterator(Stream<T> stream) 1752 factory StreamIterator(Stream<T> stream)
1616 // TODO(lrn): use redirecting factory constructor when type 1753 // TODO(lrn): use redirecting factory constructor when type
1617 // arguments are supported. 1754 // arguments are supported.
1618 => new _StreamIteratorImpl<T>(stream); 1755 => new _StreamIteratorImpl<T>(stream);
1619 1756
1620 /** 1757 /**
1621 * Wait for the next stream value to be available. 1758 * Wait for the next stream value to be available.
1622 * 1759 *
1623 * It is not allowed to call this function again until the future has 1760 * Returns a future which will complete with either `true` or `false`.
1624 * completed. If the returned future completes with anything except `true`, 1761 * Completing with `true` means that another event has been received and
1625 * the iterator is done, and no new value will ever be available. 1762 * can be read as [current].
1763 * Completing with `false` means that the stream itearation is done and
1764 * no further events will ever be available.
1765 * The future may complete with an error, if the stream produces an error,
1766 * which also ends iteration.
1626 * 1767 *
1627 * The future may complete with an error, if the stream produces an error. 1768 * The function must not be called again until the future returned by a
1769 * previous call is completed.
1628 */ 1770 */
1629 Future<bool> moveNext(); 1771 Future<bool> moveNext();
1630 1772
1631 /** 1773 /**
1632 * The current value of the stream. 1774 * The current value of the stream.
1633 * 1775 *
1634 * Only valid when the future returned by [moveNext] completes with `true` 1776 * Is `null` before the first call to [moveNext] and after a call to
1635 * as value, and only until the next call to [moveNext]. 1777 * `moveNext` completes with a `false` result or an error.
1778 *
1779 * When a `moveNext` call completes with `true`, the `current` field holds
1780 * the most recent event of the stream, and it stays like that until the next
1781 * call to `moveNext`.
1782 * Between a call to `moveNext` and when its returned future completes,
1783 * the value is unspecified.
1636 */ 1784 */
1637 T get current; 1785 T get current;
1638 1786
1639 /** 1787 /**
1640 * Cancels the stream iterator (and the underlying stream subscription) early. 1788 * Cancels the stream iterator (and the underlying stream subscription) early.
1641 * 1789 *
1642 * The stream iterator is automatically canceled if the [moveNext] future 1790 * The stream iterator is automatically canceled if the [moveNext] future
1643 * completes with either `false` or an error. 1791 * completes with either `false` or an error.
1644 * 1792 *
1645 * If a [moveNext] call has been made, it will complete with `false` as value,
1646 * as will all further calls to [moveNext].
1647 *
1648 * If you need to stop listening for values before the stream iterator is 1793 * If you need to stop listening for values before the stream iterator is
1649 * automatically closed, you must call [cancel] to ensure that the stream 1794 * automatically closed, you must call [cancel] to ensure that the stream
1650 * is properly closed. 1795 * is properly closed.
1651 * 1796 *
1797 * If [moveNext] has been called when the iterator is cancelled,
1798 * its returned future will complete with `false` as value,
1799 * as will all further calls to [moveNext].
1800 *
1652 * Returns a future if the cancel-operation is not completed synchronously. 1801 * Returns a future if the cancel-operation is not completed synchronously.
1653 * Otherwise returns `null`. 1802 * Otherwise returns `null`.
1654 */ 1803 */
1655 Future cancel(); 1804 Future cancel();
1656 } 1805 }
1657 1806
1658 1807
1659 /** 1808 /**
1660 * Wraps an [_EventSink] so it exposes only the [EventSink] interface. 1809 * Wraps an [_EventSink] so it exposes only the [EventSink] interface.
1661 */ 1810 */
1662 class _ControllerEventSinkWrapper<T> implements EventSink<T> { 1811 class _ControllerEventSinkWrapper<T> implements EventSink<T> {
1663 EventSink _sink; 1812 EventSink _sink;
1664 _ControllerEventSinkWrapper(this._sink); 1813 _ControllerEventSinkWrapper(this._sink);
1665 1814
1666 void add(T data) { _sink.add(data); } 1815 void add(T data) { _sink.add(data); }
1667 void addError(error, [StackTrace stackTrace]) { 1816 void addError(error, [StackTrace stackTrace]) {
1668 _sink.addError(error, stackTrace); 1817 _sink.addError(error, stackTrace);
1669 } 1818 }
1670 void close() { _sink.close(); } 1819 void close() { _sink.close(); }
1671 } 1820 }
OLDNEW
« no previous file with comments | « tool/input_sdk/lib/async/schedule_microtask.dart ('k') | tool/input_sdk/lib/async/stream_controller.dart » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698