| OLD | NEW |
| 1 // Copyright (c) 2012, the Dart project authors. Please see the AUTHORS file | 1 // Copyright (c) 2012, the Dart project authors. Please see the AUTHORS file |
| 2 // for details. All rights reserved. Use of this source code is governed by a | 2 // for details. All rights reserved. Use of this source code is governed by a |
| 3 // BSD-style license that can be found in the LICENSE file. | 3 // BSD-style license that can be found in the LICENSE file. |
| 4 | 4 |
| 5 part of dart.async; | 5 part of dart.async; |
| 6 | 6 |
| 7 // ------------------------------------------------------------------- | 7 // ------------------------------------------------------------------- |
| 8 // Core Stream types | 8 // Core Stream types |
| 9 // ------------------------------------------------------------------- | 9 // ------------------------------------------------------------------- |
| 10 | 10 |
| (...skipping 59 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 70 | 70 |
| 71 /** | 71 /** |
| 72 * Creates a single-subscription stream that gets its data from [data]. | 72 * Creates a single-subscription stream that gets its data from [data]. |
| 73 */ | 73 */ |
| 74 factory Stream.fromIterable(Iterable<T> data) { | 74 factory Stream.fromIterable(Iterable<T> data) { |
| 75 _PendingEvents iterableEvents = new _IterablePendingEvents<T>(data); | 75 _PendingEvents iterableEvents = new _IterablePendingEvents<T>(data); |
| 76 return new _GeneratedSingleStreamImpl<T>(iterableEvents); | 76 return new _GeneratedSingleStreamImpl<T>(iterableEvents); |
| 77 } | 77 } |
| 78 | 78 |
| 79 /** | 79 /** |
| 80 * Whether the stream is a broadcast stream. | 80 * Reports whether this stream is a broadcast stream. |
| 81 */ | 81 */ |
| 82 bool get isBroadcast => false; | 82 bool get isBroadcast => false; |
| 83 | 83 |
| 84 /** | 84 /** |
| 85 * Returns a multi-subscription stream that produces the same events as this. | 85 * Returns a multi-subscription stream that produces the same events as this. |
| 86 * | 86 * |
| 87 * If this stream is single-subscription, return a new stream that allows | 87 * If this stream is single-subscription, return a new stream that allows |
| 88 * multiple subscribers. It will subscribe to this stream when its first | 88 * multiple subscribers. It will subscribe to this stream when its first |
| 89 * subscriber is added, and unsubscribe again when the last subscription is | 89 * subscriber is added, and unsubscribe again when the last subscription is |
| 90 * cancelled. | 90 * cancelled. |
| 91 * | 91 * |
| 92 * If this stream is already a broadcast stream, it is returned unmodified. | 92 * If this stream is already a broadcast stream, it is returned unmodified. |
| 93 */ | 93 */ |
| 94 Stream<T> asBroadcastStream() { | 94 Stream<T> asBroadcastStream() { |
| 95 if (isBroadcast) return this; | 95 if (isBroadcast) return this; |
| 96 return new _SingleStreamMultiplexer<T>(this); | 96 return new _SingleStreamMultiplexer<T>(this); |
| 97 } | 97 } |
| 98 | 98 |
| 99 /** | 99 /** |
| 100 * Add a subscription to this stream. | 100 * Adds a subscription to this stream. |
| 101 * | 101 * |
| 102 * On each data event from this stream, the subscribers [onData] handler | 102 * On each data event from this stream, the subscriber's [onData] handler |
| 103 * is called. If [onData] is null, nothing happens. | 103 * is called. If [onData] is null, nothing happens. |
| 104 * | 104 * |
| 105 * On errors from this stream, the [onError] handler is given a | 105 * On errors from this stream, the [onError] handler is given a |
| 106 * [AsyncError] object describing the error. | 106 * [AsyncError] object describing the error. |
| 107 * | 107 * |
| 108 * If this stream closes, the [onDone] handler is called. | 108 * If this stream closes, the [onDone] handler is called. |
| 109 * | 109 * |
| 110 * If [unsubscribeOnError] is true, the subscription is ended when | 110 * If [unsubscribeOnError] is true, the subscription is ended when |
| 111 * the first error is reported. The default is false. | 111 * the first error is reported. The default is false. |
| 112 */ | 112 */ |
| 113 StreamSubscription<T> listen(void onData(T event), | 113 StreamSubscription<T> listen(void onData(T event), |
| 114 { void onError(AsyncError error), | 114 { void onError(AsyncError error), |
| 115 void onDone(), | 115 void onDone(), |
| 116 bool unsubscribeOnError}); | 116 bool unsubscribeOnError}); |
| 117 | 117 |
| 118 /** | 118 /** |
| 119 * Creates a new stream from this stream that discards some data events. | 119 * Creates a new stream from this stream that discards some data events. |
| 120 * | 120 * |
| 121 * The new stream sends the same error and done events as this stream, | 121 * The new stream sends the same error and done events as this stream, |
| 122 * but it only sends the data events that satisfy the [test]. | 122 * but it only sends the data events that satisfy the [test]. |
| 123 */ | 123 */ |
| 124 Stream<T> where(bool test(T event)) { | 124 Stream<T> where(bool test(T event)) { |
| 125 return new _WhereStream<T>(this, test); | 125 return new _WhereStream<T>(this, test); |
| 126 } | 126 } |
| 127 | 127 |
| 128 /** | 128 /** |
| 129 * Create a new stream that converts each element of this stream | 129 * Creates a new stream that converts each element of this stream |
| 130 * to a new value using the [convert] function. | 130 * to a new value using the [convert] function. |
| 131 */ | 131 */ |
| 132 Stream map(convert(T event)) { | 132 Stream map(convert(T event)) { |
| 133 return new _MapStream<T, dynamic>(this, convert); | 133 return new _MapStream<T, dynamic>(this, convert); |
| 134 } | 134 } |
| 135 | 135 |
| 136 /** | 136 /** |
| 137 * Create a wrapper Stream that intercepts some errors from this stream. | 137 * Creates a wrapper Stream that intercepts some errors from this stream. |
| 138 * | 138 * |
| 139 * If this stream sends an error that matches [test], then it is intercepted | 139 * If this stream sends an error that matches [test], then it is intercepted |
| 140 * by the [handle] function. | 140 * by the [handle] function. |
| 141 * | 141 * |
| 142 * An [AsyncError] [:e:] is matched by a test function if [:test(e):] returns | 142 * An [AsyncError] [:e:] is matched by a test function if [:test(e):] returns |
| 143 * true. If [test] is omitted, every error is considered matching. | 143 * true. If [test] is omitted, every error is considered matching. |
| 144 * | 144 * |
| 145 * If the error is intercepted, the [handle] function can decide what to do | 145 * If the error is intercepted, the [handle] function can decide what to do |
| 146 * with it. It can throw if it wants to raise a new (or the same) error, | 146 * with it. It can throw if it wants to raise a new (or the same) error, |
| 147 * or simply return to make the stream forget the error. | 147 * or simply return to make the stream forget the error. |
| 148 * | 148 * |
| 149 * If you need to transform an error into a data event, use the more generic | 149 * If you need to transform an error into a data event, use the more generic |
| 150 * [Stream.transformEvent] to handle the event by writing a data event to | 150 * [Stream.transformEvent] to handle the event by writing a data event to |
| 151 * the output sink | 151 * the output sink |
| 152 */ | 152 */ |
| 153 Stream<T> handleError(void handle(AsyncError error), { bool test(error) }) { | 153 Stream<T> handleError(void handle(AsyncError error), { bool test(error) }) { |
| 154 return new _HandleErrorStream<T>(this, handle, test); | 154 return new _HandleErrorStream<T>(this, handle, test); |
| 155 } | 155 } |
| 156 | 156 |
| 157 /** | 157 /** |
| 158 * Create a new stream from this stream that converts each element | 158 * Creates a new stream from this stream that converts each element |
| 159 * into zero or more events. | 159 * into zero or more events. |
| 160 * | 160 * |
| 161 * Each incoming event is converted to an [Iterable] of new events, | 161 * Each incoming event is converted to an [Iterable] of new events, |
| 162 * and each of these new events are then sent by the returned stream | 162 * and each of these new events are then sent by the returned stream |
| 163 * in order. | 163 * in order. |
| 164 */ | 164 */ |
| 165 Stream expand(Iterable convert(T value)) { | 165 Stream expand(Iterable convert(T value)) { |
| 166 return new _ExpandStream<T, dynamic>(this, convert); | 166 return new _ExpandStream<T, dynamic>(this, convert); |
| 167 } | 167 } |
| 168 | 168 |
| 169 /** | 169 /** |
| 170 * Bind this stream as the input of the provided [StreamConsumer]. | 170 * Binds this stream as the input of the provided [StreamConsumer]. |
| 171 */ | 171 */ |
| 172 Future pipe(StreamConsumer<T, dynamic> streamConsumer) { | 172 Future pipe(StreamConsumer<T, dynamic> streamConsumer) { |
| 173 return streamConsumer.consume(this); | 173 return streamConsumer.consume(this); |
| 174 } | 174 } |
| 175 | 175 |
| 176 /** | 176 /** |
| 177 * Chain this stream as the input of the provided [StreamTransformer]. | 177 * Chains this stream as the input of the provided [StreamTransformer]. |
| 178 * | 178 * |
| 179 * Returns the result of [:streamTransformer.bind:] itself. | 179 * Returns the result of [:streamTransformer.bind:] itself. |
| 180 */ | 180 */ |
| 181 Stream transform(StreamTransformer<T, dynamic> streamTransformer) { | 181 Stream transform(StreamTransformer<T, dynamic> streamTransformer) { |
| 182 return streamTransformer.bind(this); | 182 return streamTransformer.bind(this); |
| 183 } | 183 } |
| 184 | 184 |
| 185 /** Reduces a sequence of values by repeatedly applying [combine]. */ | 185 /** Reduces a sequence of values by repeatedly applying [combine]. */ |
| 186 Future reduce(var initialValue, combine(var previous, T element)) { | 186 Future reduce(var initialValue, combine(var previous, T element)) { |
| 187 _FutureImpl result = new _FutureImpl(); | 187 _FutureImpl result = new _FutureImpl(); |
| (...skipping 30 matching lines...) Expand all Loading... |
| 218 onDone: () { | 218 onDone: () { |
| 219 sink.close(); | 219 sink.close(); |
| 220 result._setValue(null); | 220 result._setValue(null); |
| 221 }, | 221 }, |
| 222 unsubscribeOnError: unsubscribeOnError); | 222 unsubscribeOnError: unsubscribeOnError); |
| 223 return result; | 223 return result; |
| 224 } | 224 } |
| 225 | 225 |
| 226 | 226 |
| 227 /** | 227 /** |
| 228 * Check whether [match] occurs in the elements provided by this stream. | 228 * Checks whether [match] occurs in the elements provided by this stream. |
| 229 * | 229 * |
| 230 * Completes the [Future] when the answer is known. | 230 * Completes the [Future] when the answer is known. |
| 231 * If this stream reports an error, the [Future] will report that error. | 231 * If this stream reports an error, the [Future] will report that error. |
| 232 */ | 232 */ |
| 233 Future<bool> contains(T match) { | 233 Future<bool> contains(T match) { |
| 234 _FutureImpl<bool> future = new _FutureImpl<bool>(); | 234 _FutureImpl<bool> future = new _FutureImpl<bool>(); |
| 235 StreamSubscription subscription; | 235 StreamSubscription subscription; |
| 236 subscription = this.listen( | 236 subscription = this.listen( |
| 237 // TODO(ahe): Restore type when feature is implemented in dart2js | 237 // TODO(ahe): Restore type when feature is implemented in dart2js |
| 238 // checked mode. http://dartbug.com/7733 | 238 // checked mode. http://dartbug.com/7733 |
| (...skipping 11 matching lines...) Expand all Loading... |
| 250 }, | 250 }, |
| 251 onError: future._setError, | 251 onError: future._setError, |
| 252 onDone: () { | 252 onDone: () { |
| 253 future._setValue(false); | 253 future._setValue(false); |
| 254 }, | 254 }, |
| 255 unsubscribeOnError: true); | 255 unsubscribeOnError: true); |
| 256 return future; | 256 return future; |
| 257 } | 257 } |
| 258 | 258 |
| 259 /** | 259 /** |
| 260 * Check whether [test] accepts all elements provided by this stream. | 260 * Checks whether [test] accepts all elements provided by this stream. |
| 261 * | 261 * |
| 262 * Completes the [Future] when the answer is known. | 262 * Completes the [Future] when the answer is known. |
| 263 * If this stream reports an error, the [Future] will report that error. | 263 * If this stream reports an error, the [Future] will report that error. |
| 264 */ | 264 */ |
| 265 Future<bool> every(bool test(T element)) { | 265 Future<bool> every(bool test(T element)) { |
| 266 _FutureImpl<bool> future = new _FutureImpl<bool>(); | 266 _FutureImpl<bool> future = new _FutureImpl<bool>(); |
| 267 StreamSubscription subscription; | 267 StreamSubscription subscription; |
| 268 subscription = this.listen( | 268 subscription = this.listen( |
| 269 // TODO(ahe): Restore type when feature is implemented in dart2js | 269 // TODO(ahe): Restore type when feature is implemented in dart2js |
| 270 // checked mode. http://dartbug.com/7733 | 270 // checked mode. http://dartbug.com/7733 |
| (...skipping 11 matching lines...) Expand all Loading... |
| 282 }, | 282 }, |
| 283 onError: future._setError, | 283 onError: future._setError, |
| 284 onDone: () { | 284 onDone: () { |
| 285 future._setValue(true); | 285 future._setValue(true); |
| 286 }, | 286 }, |
| 287 unsubscribeOnError: true); | 287 unsubscribeOnError: true); |
| 288 return future; | 288 return future; |
| 289 } | 289 } |
| 290 | 290 |
| 291 /** | 291 /** |
| 292 * Check whether [test] accepts any element provided by this stream. | 292 * Checks whether [test] accepts any element provided by this stream. |
| 293 * | 293 * |
| 294 * Completes the [Future] when the answer is known. | 294 * Completes the [Future] when the answer is known. |
| 295 * If this stream reports an error, the [Future] will report that error. | 295 * If this stream reports an error, the [Future] will report that error. |
| 296 */ | 296 */ |
| 297 Future<bool> any(bool test(T element)) { | 297 Future<bool> any(bool test(T element)) { |
| 298 _FutureImpl<bool> future = new _FutureImpl<bool>(); | 298 _FutureImpl<bool> future = new _FutureImpl<bool>(); |
| 299 StreamSubscription subscription; | 299 StreamSubscription subscription; |
| 300 subscription = this.listen( | 300 subscription = this.listen( |
| 301 // TODO(ahe): Restore type when feature is implemented in dart2js | 301 // TODO(ahe): Restore type when feature is implemented in dart2js |
| 302 // checked mode. http://dartbug.com/7733 | 302 // checked mode. http://dartbug.com/7733 |
| (...skipping 130 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 433 future._setValue(false); | 433 future._setValue(false); |
| 434 }, | 434 }, |
| 435 onError: future._setError, | 435 onError: future._setError, |
| 436 onDone: () { | 436 onDone: () { |
| 437 future._setValue(true); | 437 future._setValue(true); |
| 438 }, | 438 }, |
| 439 unsubscribeOnError: true); | 439 unsubscribeOnError: true); |
| 440 return future; | 440 return future; |
| 441 } | 441 } |
| 442 | 442 |
| 443 /** Collect the data of this stream in a [List]. */ | 443 /** Collects the data of this stream in a [List]. */ |
| 444 Future<List<T>> toList() { | 444 Future<List<T>> toList() { |
| 445 List<T> result = <T>[]; | 445 List<T> result = <T>[]; |
| 446 _FutureImpl<List<T>> future = new _FutureImpl<List<T>>(); | 446 _FutureImpl<List<T>> future = new _FutureImpl<List<T>>(); |
| 447 this.listen( | 447 this.listen( |
| 448 // TODO(ahe): Restore type when feature is implemented in dart2js | 448 // TODO(ahe): Restore type when feature is implemented in dart2js |
| 449 // checked mode. http://dartbug.com/7733 | 449 // checked mode. http://dartbug.com/7733 |
| 450 (/*T*/ data) { | 450 (/*T*/ data) { |
| 451 result.add(data); | 451 result.add(data); |
| 452 }, | 452 }, |
| 453 onError: future._setError, | 453 onError: future._setError, |
| 454 onDone: () { | 454 onDone: () { |
| 455 future._setValue(result); | 455 future._setValue(result); |
| 456 }, | 456 }, |
| 457 unsubscribeOnError: true); | 457 unsubscribeOnError: true); |
| 458 return future; | 458 return future; |
| 459 } | 459 } |
| 460 | 460 |
| 461 /** Collect the data of this stream in a [Set]. */ | 461 /** Collects the data of this stream in a [Set]. */ |
| 462 Future<Set<T>> toSet() { | 462 Future<Set<T>> toSet() { |
| 463 Set<T> result = new Set<T>(); | 463 Set<T> result = new Set<T>(); |
| 464 _FutureImpl<Set<T>> future = new _FutureImpl<Set<T>>(); | 464 _FutureImpl<Set<T>> future = new _FutureImpl<Set<T>>(); |
| 465 this.listen( | 465 this.listen( |
| 466 // TODO(ahe): Restore type when feature is implemented in dart2js | 466 // TODO(ahe): Restore type when feature is implemented in dart2js |
| 467 // checked mode. http://dartbug.com/7733 | 467 // checked mode. http://dartbug.com/7733 |
| 468 (/*T*/ data) { | 468 (/*T*/ data) { |
| 469 result.add(data); | 469 result.add(data); |
| 470 }, | 470 }, |
| 471 onError: future._setError, | 471 onError: future._setError, |
| 472 onDone: () { | 472 onDone: () { |
| 473 future._setValue(result); | 473 future._setValue(result); |
| 474 }, | 474 }, |
| 475 unsubscribeOnError: true); | 475 unsubscribeOnError: true); |
| 476 return future; | 476 return future; |
| 477 } | 477 } |
| 478 | 478 |
| 479 /** | 479 /** |
| 480 * Provide at most the first [n] values of this stream. | 480 * Provides at most the first [n] values of this stream. |
| 481 * | 481 * |
| 482 * Forwards the first [n] data events of this stream, and all error | 482 * Forwards the first [n] data events of this stream, and all error |
| 483 * events, to the returned stream, and ends with a done event. | 483 * events, to the returned stream, and ends with a done event. |
| 484 * | 484 * |
| 485 * If this stream produces fewer than [count] values before it's done, | 485 * If this stream produces fewer than [count] values before it's done, |
| 486 * so will the returned stream. | 486 * so will the returned stream. |
| 487 */ | 487 */ |
| 488 Stream<T> take(int count) { | 488 Stream<T> take(int count) { |
| 489 return new _TakeStream(this, count); | 489 return new _TakeStream(this, count); |
| 490 } | 490 } |
| (...skipping 23 matching lines...) Expand all Loading... |
| 514 * Error and done events are provided by the returned stream unmodified. | 514 * Error and done events are provided by the returned stream unmodified. |
| 515 * | 515 * |
| 516 * Starting with the first data event where [test] returns true for the | 516 * Starting with the first data event where [test] returns true for the |
| 517 * event data, the returned stream will have the same events as this stream. | 517 * event data, the returned stream will have the same events as this stream. |
| 518 */ | 518 */ |
| 519 Stream<T> skipWhile(bool test(T value)) { | 519 Stream<T> skipWhile(bool test(T value)) { |
| 520 return new _SkipWhileStream(this, test); | 520 return new _SkipWhileStream(this, test); |
| 521 } | 521 } |
| 522 | 522 |
| 523 /** | 523 /** |
| 524 * Skip data events if they are equal to the previous data event. | 524 * Skips data events if they are equal to the previous data event. |
| 525 * | 525 * |
| 526 * The returned stream provides the same events as this stream, except | 526 * The returned stream provides the same events as this stream, except |
| 527 * that it never provides two consequtive data events that are equal. | 527 * that it never provides two consequtive data events that are equal. |
| 528 * | 528 * |
| 529 * Equality is determined by the provided [equals] method. If that is | 529 * Equality is determined by the provided [equals] method. If that is |
| 530 * omitted, the '==' operator on the last provided data element is used. | 530 * omitted, the '==' operator on the last provided data element is used. |
| 531 */ | 531 */ |
| 532 Stream<T> distinct([bool equals(T previous, T next)]) { | 532 Stream<T> distinct([bool equals(T previous, T next)]) { |
| 533 return new _DistinctStream(this, equals); | 533 return new _DistinctStream(this, equals); |
| 534 } | 534 } |
| (...skipping 82 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 617 future._setValue(result); | 617 future._setValue(result); |
| 618 return; | 618 return; |
| 619 } | 619 } |
| 620 future._setError(new AsyncError(new StateError("No elements"))); | 620 future._setError(new AsyncError(new StateError("No elements"))); |
| 621 }, | 621 }, |
| 622 unsubscribeOnError: true); | 622 unsubscribeOnError: true); |
| 623 return future; | 623 return future; |
| 624 } | 624 } |
| 625 | 625 |
| 626 /** | 626 /** |
| 627 * Find the first element of this stream matching [test]. | 627 * Finds the first element of this stream matching [test]. |
| 628 * | 628 * |
| 629 * Returns a future that is filled with the first element of this stream | 629 * Returns a future that is filled with the first element of this stream |
| 630 * that [test] returns true for. | 630 * that [test] returns true for. |
| 631 * | 631 * |
| 632 * If no such element is found before this stream is done, and a | 632 * If no such element is found before this stream is done, and a |
| 633 * [defaultValue] function is provided, the result of calling [defaultValue] | 633 * [defaultValue] function is provided, the result of calling [defaultValue] |
| 634 * becomes the value of the future. | 634 * becomes the value of the future. |
| 635 * | 635 * |
| 636 * If an error occurs, or if this stream ends without finding a match and | 636 * If an error occurs, or if this stream ends without finding a match and |
| 637 * with no [defaultValue] function provided, the future will receive an | 637 * with no [defaultValue] function provided, the future will receive an |
| (...skipping 466 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 1104 } | 1104 } |
| 1105 | 1105 |
| 1106 class _StreamOutputSinkWrapper<T> implements StreamSink<T> { | 1106 class _StreamOutputSinkWrapper<T> implements StreamSink<T> { |
| 1107 _StreamOutputSink _sink; | 1107 _StreamOutputSink _sink; |
| 1108 _StreamOutputSinkWrapper(this._sink); | 1108 _StreamOutputSinkWrapper(this._sink); |
| 1109 | 1109 |
| 1110 void add(T data) => _sink._sendData(data); | 1110 void add(T data) => _sink._sendData(data); |
| 1111 void signalError(AsyncError error) => _sink._sendError(error); | 1111 void signalError(AsyncError error) => _sink._sendError(error); |
| 1112 void close() => _sink._sendDone(); | 1112 void close() => _sink._sendDone(); |
| 1113 } | 1113 } |
| OLD | NEW |