OLD | NEW |
1 // Copyright (c) 2012, the Dart project authors. Please see the AUTHORS file | 1 // Copyright (c) 2012, the Dart project authors. Please see the AUTHORS file |
2 // for details. All rights reserved. Use of this source code is governed by a | 2 // for details. All rights reserved. Use of this source code is governed by a |
3 // BSD-style license that can be found in the LICENSE file. | 3 // BSD-style license that can be found in the LICENSE file. |
4 | 4 |
5 part of dart.async; | 5 part of dart.async; |
6 | 6 |
7 // ------------------------------------------------------------------- | 7 // ------------------------------------------------------------------- |
8 // Core Stream types | 8 // Core Stream types |
9 // ------------------------------------------------------------------- | 9 // ------------------------------------------------------------------- |
10 | 10 |
(...skipping 59 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
70 | 70 |
71 /** | 71 /** |
72 * Creates a single-subscription stream that gets its data from [data]. | 72 * Creates a single-subscription stream that gets its data from [data]. |
73 */ | 73 */ |
74 factory Stream.fromIterable(Iterable<T> data) { | 74 factory Stream.fromIterable(Iterable<T> data) { |
75 _PendingEvents iterableEvents = new _IterablePendingEvents<T>(data); | 75 _PendingEvents iterableEvents = new _IterablePendingEvents<T>(data); |
76 return new _GeneratedSingleStreamImpl<T>(iterableEvents); | 76 return new _GeneratedSingleStreamImpl<T>(iterableEvents); |
77 } | 77 } |
78 | 78 |
79 /** | 79 /** |
80 * Whether the stream is a broadcast stream. | 80 * Reports whether this stream is a broadcast stream. |
81 */ | 81 */ |
82 bool get isBroadcast => false; | 82 bool get isBroadcast => false; |
83 | 83 |
84 /** | 84 /** |
85 * Returns a multi-subscription stream that produces the same events as this. | 85 * Returns a multi-subscription stream that produces the same events as this. |
86 * | 86 * |
87 * If this stream is single-subscription, return a new stream that allows | 87 * If this stream is single-subscription, return a new stream that allows |
88 * multiple subscribers. It will subscribe to this stream when its first | 88 * multiple subscribers. It will subscribe to this stream when its first |
89 * subscriber is added, and unsubscribe again when the last subscription is | 89 * subscriber is added, and unsubscribe again when the last subscription is |
90 * cancelled. | 90 * cancelled. |
91 * | 91 * |
92 * If this stream is already a broadcast stream, it is returned unmodified. | 92 * If this stream is already a broadcast stream, it is returned unmodified. |
93 */ | 93 */ |
94 Stream<T> asBroadcastStream() { | 94 Stream<T> asBroadcastStream() { |
95 if (isBroadcast) return this; | 95 if (isBroadcast) return this; |
96 return new _SingleStreamMultiplexer<T>(this); | 96 return new _SingleStreamMultiplexer<T>(this); |
97 } | 97 } |
98 | 98 |
99 /** | 99 /** |
100 * Add a subscription to this stream. | 100 * Adds a subscription to this stream. |
101 * | 101 * |
102 * On each data event from this stream, the subscribers [onData] handler | 102 * On each data event from this stream, the subscriber's [onData] handler |
103 * is called. If [onData] is null, nothing happens. | 103 * is called. If [onData] is null, nothing happens. |
104 * | 104 * |
105 * On errors from this stream, the [onError] handler is given a | 105 * On errors from this stream, the [onError] handler is given a |
106 * [AsyncError] object describing the error. | 106 * [AsyncError] object describing the error. |
107 * | 107 * |
108 * If this stream closes, the [onDone] handler is called. | 108 * If this stream closes, the [onDone] handler is called. |
109 * | 109 * |
110 * If [unsubscribeOnError] is true, the subscription is ended when | 110 * If [unsubscribeOnError] is true, the subscription is ended when |
111 * the first error is reported. The default is false. | 111 * the first error is reported. The default is false. |
112 */ | 112 */ |
113 StreamSubscription<T> listen(void onData(T event), | 113 StreamSubscription<T> listen(void onData(T event), |
114 { void onError(AsyncError error), | 114 { void onError(AsyncError error), |
115 void onDone(), | 115 void onDone(), |
116 bool unsubscribeOnError}); | 116 bool unsubscribeOnError}); |
117 | 117 |
118 /** | 118 /** |
119 * Creates a new stream from this stream that discards some data events. | 119 * Creates a new stream from this stream that discards some data events. |
120 * | 120 * |
121 * The new stream sends the same error and done events as this stream, | 121 * The new stream sends the same error and done events as this stream, |
122 * but it only sends the data events that satisfy the [test]. | 122 * but it only sends the data events that satisfy the [test]. |
123 */ | 123 */ |
124 Stream<T> where(bool test(T event)) { | 124 Stream<T> where(bool test(T event)) { |
125 return new _WhereStream<T>(this, test); | 125 return new _WhereStream<T>(this, test); |
126 } | 126 } |
127 | 127 |
128 /** | 128 /** |
129 * Create a new stream that converts each element of this stream | 129 * Creates a new stream that converts each element of this stream |
130 * to a new value using the [convert] function. | 130 * to a new value using the [convert] function. |
131 */ | 131 */ |
132 Stream map(convert(T event)) { | 132 Stream map(convert(T event)) { |
133 return new _MapStream<T, dynamic>(this, convert); | 133 return new _MapStream<T, dynamic>(this, convert); |
134 } | 134 } |
135 | 135 |
136 /** | 136 /** |
137 * Create a wrapper Stream that intercepts some errors from this stream. | 137 * Creates a wrapper Stream that intercepts some errors from this stream. |
138 * | 138 * |
139 * If this stream sends an error that matches [test], then it is intercepted | 139 * If this stream sends an error that matches [test], then it is intercepted |
140 * by the [handle] function. | 140 * by the [handle] function. |
141 * | 141 * |
142 * An [AsyncError] [:e:] is matched by a test function if [:test(e):] returns | 142 * An [AsyncError] [:e:] is matched by a test function if [:test(e):] returns |
143 * true. If [test] is omitted, every error is considered matching. | 143 * true. If [test] is omitted, every error is considered matching. |
144 * | 144 * |
145 * If the error is intercepted, the [handle] function can decide what to do | 145 * If the error is intercepted, the [handle] function can decide what to do |
146 * with it. It can throw if it wants to raise a new (or the same) error, | 146 * with it. It can throw if it wants to raise a new (or the same) error, |
147 * or simply return to make the stream forget the error. | 147 * or simply return to make the stream forget the error. |
148 * | 148 * |
149 * If you need to transform an error into a data event, use the more generic | 149 * If you need to transform an error into a data event, use the more generic |
150 * [Stream.transformEvent] to handle the event by writing a data event to | 150 * [Stream.transformEvent] to handle the event by writing a data event to |
151 * the output sink | 151 * the output sink |
152 */ | 152 */ |
153 Stream<T> handleError(void handle(AsyncError error), { bool test(error) }) { | 153 Stream<T> handleError(void handle(AsyncError error), { bool test(error) }) { |
154 return new _HandleErrorStream<T>(this, handle, test); | 154 return new _HandleErrorStream<T>(this, handle, test); |
155 } | 155 } |
156 | 156 |
157 /** | 157 /** |
158 * Create a new stream from this stream that converts each element | 158 * Creates a new stream from this stream that converts each element |
159 * into zero or more events. | 159 * into zero or more events. |
160 * | 160 * |
161 * Each incoming event is converted to an [Iterable] of new events, | 161 * Each incoming event is converted to an [Iterable] of new events, |
162 * and each of these new events are then sent by the returned stream | 162 * and each of these new events are then sent by the returned stream |
163 * in order. | 163 * in order. |
164 */ | 164 */ |
165 Stream expand(Iterable convert(T value)) { | 165 Stream expand(Iterable convert(T value)) { |
166 return new _ExpandStream<T, dynamic>(this, convert); | 166 return new _ExpandStream<T, dynamic>(this, convert); |
167 } | 167 } |
168 | 168 |
169 /** | 169 /** |
170 * Bind this stream as the input of the provided [StreamConsumer]. | 170 * Binds this stream as the input of the provided [StreamConsumer]. |
171 */ | 171 */ |
172 Future pipe(StreamConsumer<T, dynamic> streamConsumer) { | 172 Future pipe(StreamConsumer<T, dynamic> streamConsumer) { |
173 return streamConsumer.consume(this); | 173 return streamConsumer.consume(this); |
174 } | 174 } |
175 | 175 |
176 /** | 176 /** |
177 * Chain this stream as the input of the provided [StreamTransformer]. | 177 * Chains this stream as the input of the provided [StreamTransformer]. |
178 * | 178 * |
179 * Returns the result of [:streamTransformer.bind:] itself. | 179 * Returns the result of [:streamTransformer.bind:] itself. |
180 */ | 180 */ |
181 Stream transform(StreamTransformer<T, dynamic> streamTransformer) { | 181 Stream transform(StreamTransformer<T, dynamic> streamTransformer) { |
182 return streamTransformer.bind(this); | 182 return streamTransformer.bind(this); |
183 } | 183 } |
184 | 184 |
185 /** Reduces a sequence of values by repeatedly applying [combine]. */ | 185 /** Reduces a sequence of values by repeatedly applying [combine]. */ |
186 Future reduce(var initialValue, combine(var previous, T element)) { | 186 Future reduce(var initialValue, combine(var previous, T element)) { |
187 _FutureImpl result = new _FutureImpl(); | 187 _FutureImpl result = new _FutureImpl(); |
(...skipping 30 matching lines...) Expand all Loading... |
218 onDone: () { | 218 onDone: () { |
219 sink.close(); | 219 sink.close(); |
220 result._setValue(null); | 220 result._setValue(null); |
221 }, | 221 }, |
222 unsubscribeOnError: unsubscribeOnError); | 222 unsubscribeOnError: unsubscribeOnError); |
223 return result; | 223 return result; |
224 } | 224 } |
225 | 225 |
226 | 226 |
227 /** | 227 /** |
228 * Check whether [match] occurs in the elements provided by this stream. | 228 * Checks whether [match] occurs in the elements provided by this stream. |
229 * | 229 * |
230 * Completes the [Future] when the answer is known. | 230 * Completes the [Future] when the answer is known. |
231 * If this stream reports an error, the [Future] will report that error. | 231 * If this stream reports an error, the [Future] will report that error. |
232 */ | 232 */ |
233 Future<bool> contains(T match) { | 233 Future<bool> contains(T match) { |
234 _FutureImpl<bool> future = new _FutureImpl<bool>(); | 234 _FutureImpl<bool> future = new _FutureImpl<bool>(); |
235 StreamSubscription subscription; | 235 StreamSubscription subscription; |
236 subscription = this.listen( | 236 subscription = this.listen( |
237 // TODO(ahe): Restore type when feature is implemented in dart2js | 237 // TODO(ahe): Restore type when feature is implemented in dart2js |
238 // checked mode. http://dartbug.com/7733 | 238 // checked mode. http://dartbug.com/7733 |
(...skipping 11 matching lines...) Expand all Loading... |
250 }, | 250 }, |
251 onError: future._setError, | 251 onError: future._setError, |
252 onDone: () { | 252 onDone: () { |
253 future._setValue(false); | 253 future._setValue(false); |
254 }, | 254 }, |
255 unsubscribeOnError: true); | 255 unsubscribeOnError: true); |
256 return future; | 256 return future; |
257 } | 257 } |
258 | 258 |
259 /** | 259 /** |
260 * Check whether [test] accepts all elements provided by this stream. | 260 * Checks whether [test] accepts all elements provided by this stream. |
261 * | 261 * |
262 * Completes the [Future] when the answer is known. | 262 * Completes the [Future] when the answer is known. |
263 * If this stream reports an error, the [Future] will report that error. | 263 * If this stream reports an error, the [Future] will report that error. |
264 */ | 264 */ |
265 Future<bool> every(bool test(T element)) { | 265 Future<bool> every(bool test(T element)) { |
266 _FutureImpl<bool> future = new _FutureImpl<bool>(); | 266 _FutureImpl<bool> future = new _FutureImpl<bool>(); |
267 StreamSubscription subscription; | 267 StreamSubscription subscription; |
268 subscription = this.listen( | 268 subscription = this.listen( |
269 // TODO(ahe): Restore type when feature is implemented in dart2js | 269 // TODO(ahe): Restore type when feature is implemented in dart2js |
270 // checked mode. http://dartbug.com/7733 | 270 // checked mode. http://dartbug.com/7733 |
(...skipping 11 matching lines...) Expand all Loading... |
282 }, | 282 }, |
283 onError: future._setError, | 283 onError: future._setError, |
284 onDone: () { | 284 onDone: () { |
285 future._setValue(true); | 285 future._setValue(true); |
286 }, | 286 }, |
287 unsubscribeOnError: true); | 287 unsubscribeOnError: true); |
288 return future; | 288 return future; |
289 } | 289 } |
290 | 290 |
291 /** | 291 /** |
292 * Check whether [test] accepts any element provided by this stream. | 292 * Checks whether [test] accepts any element provided by this stream. |
293 * | 293 * |
294 * Completes the [Future] when the answer is known. | 294 * Completes the [Future] when the answer is known. |
295 * If this stream reports an error, the [Future] will report that error. | 295 * If this stream reports an error, the [Future] will report that error. |
296 */ | 296 */ |
297 Future<bool> any(bool test(T element)) { | 297 Future<bool> any(bool test(T element)) { |
298 _FutureImpl<bool> future = new _FutureImpl<bool>(); | 298 _FutureImpl<bool> future = new _FutureImpl<bool>(); |
299 StreamSubscription subscription; | 299 StreamSubscription subscription; |
300 subscription = this.listen( | 300 subscription = this.listen( |
301 // TODO(ahe): Restore type when feature is implemented in dart2js | 301 // TODO(ahe): Restore type when feature is implemented in dart2js |
302 // checked mode. http://dartbug.com/7733 | 302 // checked mode. http://dartbug.com/7733 |
(...skipping 130 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
433 future._setValue(false); | 433 future._setValue(false); |
434 }, | 434 }, |
435 onError: future._setError, | 435 onError: future._setError, |
436 onDone: () { | 436 onDone: () { |
437 future._setValue(true); | 437 future._setValue(true); |
438 }, | 438 }, |
439 unsubscribeOnError: true); | 439 unsubscribeOnError: true); |
440 return future; | 440 return future; |
441 } | 441 } |
442 | 442 |
443 /** Collect the data of this stream in a [List]. */ | 443 /** Collects the data of this stream in a [List]. */ |
444 Future<List<T>> toList() { | 444 Future<List<T>> toList() { |
445 List<T> result = <T>[]; | 445 List<T> result = <T>[]; |
446 _FutureImpl<List<T>> future = new _FutureImpl<List<T>>(); | 446 _FutureImpl<List<T>> future = new _FutureImpl<List<T>>(); |
447 this.listen( | 447 this.listen( |
448 // TODO(ahe): Restore type when feature is implemented in dart2js | 448 // TODO(ahe): Restore type when feature is implemented in dart2js |
449 // checked mode. http://dartbug.com/7733 | 449 // checked mode. http://dartbug.com/7733 |
450 (/*T*/ data) { | 450 (/*T*/ data) { |
451 result.add(data); | 451 result.add(data); |
452 }, | 452 }, |
453 onError: future._setError, | 453 onError: future._setError, |
454 onDone: () { | 454 onDone: () { |
455 future._setValue(result); | 455 future._setValue(result); |
456 }, | 456 }, |
457 unsubscribeOnError: true); | 457 unsubscribeOnError: true); |
458 return future; | 458 return future; |
459 } | 459 } |
460 | 460 |
461 /** Collect the data of this stream in a [Set]. */ | 461 /** Collects the data of this stream in a [Set]. */ |
462 Future<Set<T>> toSet() { | 462 Future<Set<T>> toSet() { |
463 Set<T> result = new Set<T>(); | 463 Set<T> result = new Set<T>(); |
464 _FutureImpl<Set<T>> future = new _FutureImpl<Set<T>>(); | 464 _FutureImpl<Set<T>> future = new _FutureImpl<Set<T>>(); |
465 this.listen( | 465 this.listen( |
466 // TODO(ahe): Restore type when feature is implemented in dart2js | 466 // TODO(ahe): Restore type when feature is implemented in dart2js |
467 // checked mode. http://dartbug.com/7733 | 467 // checked mode. http://dartbug.com/7733 |
468 (/*T*/ data) { | 468 (/*T*/ data) { |
469 result.add(data); | 469 result.add(data); |
470 }, | 470 }, |
471 onError: future._setError, | 471 onError: future._setError, |
472 onDone: () { | 472 onDone: () { |
473 future._setValue(result); | 473 future._setValue(result); |
474 }, | 474 }, |
475 unsubscribeOnError: true); | 475 unsubscribeOnError: true); |
476 return future; | 476 return future; |
477 } | 477 } |
478 | 478 |
479 /** | 479 /** |
480 * Provide at most the first [n] values of this stream. | 480 * Provides at most the first [n] values of this stream. |
481 * | 481 * |
482 * Forwards the first [n] data events of this stream, and all error | 482 * Forwards the first [n] data events of this stream, and all error |
483 * events, to the returned stream, and ends with a done event. | 483 * events, to the returned stream, and ends with a done event. |
484 * | 484 * |
485 * If this stream produces fewer than [count] values before it's done, | 485 * If this stream produces fewer than [count] values before it's done, |
486 * so will the returned stream. | 486 * so will the returned stream. |
487 */ | 487 */ |
488 Stream<T> take(int count) { | 488 Stream<T> take(int count) { |
489 return new _TakeStream(this, count); | 489 return new _TakeStream(this, count); |
490 } | 490 } |
(...skipping 23 matching lines...) Expand all Loading... |
514 * Error and done events are provided by the returned stream unmodified. | 514 * Error and done events are provided by the returned stream unmodified. |
515 * | 515 * |
516 * Starting with the first data event where [test] returns true for the | 516 * Starting with the first data event where [test] returns true for the |
517 * event data, the returned stream will have the same events as this stream. | 517 * event data, the returned stream will have the same events as this stream. |
518 */ | 518 */ |
519 Stream<T> skipWhile(bool test(T value)) { | 519 Stream<T> skipWhile(bool test(T value)) { |
520 return new _SkipWhileStream(this, test); | 520 return new _SkipWhileStream(this, test); |
521 } | 521 } |
522 | 522 |
523 /** | 523 /** |
524 * Skip data events if they are equal to the previous data event. | 524 * Skips data events if they are equal to the previous data event. |
525 * | 525 * |
526 * The returned stream provides the same events as this stream, except | 526 * The returned stream provides the same events as this stream, except |
527 * that it never provides two consequtive data events that are equal. | 527 * that it never provides two consequtive data events that are equal. |
528 * | 528 * |
529 * Equality is determined by the provided [equals] method. If that is | 529 * Equality is determined by the provided [equals] method. If that is |
530 * omitted, the '==' operator on the last provided data element is used. | 530 * omitted, the '==' operator on the last provided data element is used. |
531 */ | 531 */ |
532 Stream<T> distinct([bool equals(T previous, T next)]) { | 532 Stream<T> distinct([bool equals(T previous, T next)]) { |
533 return new _DistinctStream(this, equals); | 533 return new _DistinctStream(this, equals); |
534 } | 534 } |
(...skipping 82 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
617 future._setValue(result); | 617 future._setValue(result); |
618 return; | 618 return; |
619 } | 619 } |
620 future._setError(new AsyncError(new StateError("No elements"))); | 620 future._setError(new AsyncError(new StateError("No elements"))); |
621 }, | 621 }, |
622 unsubscribeOnError: true); | 622 unsubscribeOnError: true); |
623 return future; | 623 return future; |
624 } | 624 } |
625 | 625 |
626 /** | 626 /** |
627 * Find the first element of this stream matching [test]. | 627 * Finds the first element of this stream matching [test]. |
628 * | 628 * |
629 * Returns a future that is filled with the first element of this stream | 629 * Returns a future that is filled with the first element of this stream |
630 * that [test] returns true for. | 630 * that [test] returns true for. |
631 * | 631 * |
632 * If no such element is found before this stream is done, and a | 632 * If no such element is found before this stream is done, and a |
633 * [defaultValue] function is provided, the result of calling [defaultValue] | 633 * [defaultValue] function is provided, the result of calling [defaultValue] |
634 * becomes the value of the future. | 634 * becomes the value of the future. |
635 * | 635 * |
636 * If an error occurs, or if this stream ends without finding a match and | 636 * If an error occurs, or if this stream ends without finding a match and |
637 * with no [defaultValue] function provided, the future will receive an | 637 * with no [defaultValue] function provided, the future will receive an |
(...skipping 466 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
1104 } | 1104 } |
1105 | 1105 |
1106 class _StreamOutputSinkWrapper<T> implements StreamSink<T> { | 1106 class _StreamOutputSinkWrapper<T> implements StreamSink<T> { |
1107 _StreamOutputSink _sink; | 1107 _StreamOutputSink _sink; |
1108 _StreamOutputSinkWrapper(this._sink); | 1108 _StreamOutputSinkWrapper(this._sink); |
1109 | 1109 |
1110 void add(T data) => _sink._sendData(data); | 1110 void add(T data) => _sink._sendData(data); |
1111 void signalError(AsyncError error) => _sink._sendError(error); | 1111 void signalError(AsyncError error) => _sink._sendError(error); |
1112 void close() => _sink._sendDone(); | 1112 void close() => _sink._sendDone(); |
1113 } | 1113 } |
OLD | NEW |