| OLD | NEW |
| 1 // Copyright (c) 2013, the Dart project authors. Please see the AUTHORS file | 1 // Copyright (c) 2013, the Dart project authors. Please see the AUTHORS file |
| 2 // for details. All rights reserved. Use of this source code is governed by a | 2 // for details. All rights reserved. Use of this source code is governed by a |
| 3 // BSD-style license that can be found in the LICENSE file. | 3 // BSD-style license that can be found in the LICENSE file. |
| 4 | 4 |
| 5 part of dart.async; | 5 part of dart.async; |
| 6 | 6 |
| 7 // ------------------------------------------------------------------- | 7 // ------------------------------------------------------------------- |
| 8 // Core Stream types | 8 // Core Stream types |
| 9 // ------------------------------------------------------------------- | 9 // ------------------------------------------------------------------- |
| 10 | 10 |
| (...skipping 235 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 246 * Chains this stream as the input of the provided [StreamTransformer]. | 246 * Chains this stream as the input of the provided [StreamTransformer]. |
| 247 * | 247 * |
| 248 * Returns the result of [:streamTransformer.bind:] itself. | 248 * Returns the result of [:streamTransformer.bind:] itself. |
| 249 */ | 249 */ |
| 250 Stream transform(StreamTransformer<T, dynamic> streamTransformer) { | 250 Stream transform(StreamTransformer<T, dynamic> streamTransformer) { |
| 251 return streamTransformer.bind(this); | 251 return streamTransformer.bind(this); |
| 252 } | 252 } |
| 253 | 253 |
| 254 /** | 254 /** |
| 255 * Reduces a sequence of values by repeatedly applying [combine]. | 255 * Reduces a sequence of values by repeatedly applying [combine]. |
| 256 * | |
| 257 * *WARNING UPCOMING API-CHANGE*: This method will be changed so that | |
| 258 * it doesn't take an initial value. Use [fold] instead. | |
| 259 */ | 256 */ |
| 260 Future reduce(var initialValue, combine(var previous, T element)) { | 257 Future<T> reduce(T combine(T previous, T element)) { |
| 261 return fold(initialValue, combine); | 258 _FutureImpl<T> result = new _FutureImpl<T>(); |
| 259 bool seenFirst = false; |
| 260 T value; |
| 261 StreamSubscription subscription; |
| 262 subscription = this.listen( |
| 263 // TODO(ahe): Restore type when feature is implemented in dart2js |
| 264 // checked mode. http://dartbug.com/7733 |
| 265 (/* T */ element) { |
| 266 if (seenFirst) { |
| 267 _runUserCode(() => combine(value, element), |
| 268 (T newValue) { value = newValue; }, |
| 269 _cancelAndError(subscription, result)); |
| 270 } else { |
| 271 value = element; |
| 272 seenFirst = true; |
| 273 } |
| 274 }, |
| 275 onError: result._setError, |
| 276 onDone: () { |
| 277 if (!seenFirst) { |
| 278 result._setError(new StateError("No elements")); |
| 279 } else { |
| 280 result._setValue(value); |
| 281 } |
| 282 }, |
| 283 unsubscribeOnError: true |
| 284 ); |
| 285 return result; |
| 262 } | 286 } |
| 263 | 287 |
| 264 /** Reduces a sequence of values by repeatedly applying [combine]. */ | 288 /** Reduces a sequence of values by repeatedly applying [combine]. */ |
| 265 Future fold(var initialValue, combine(var previous, T element)) { | 289 Future fold(var initialValue, combine(var previous, T element)) { |
| 266 _FutureImpl result = new _FutureImpl(); | 290 _FutureImpl result = new _FutureImpl(); |
| 267 var value = initialValue; | 291 var value = initialValue; |
| 268 StreamSubscription subscription; | 292 StreamSubscription subscription; |
| 269 subscription = this.listen( | 293 subscription = this.listen( |
| 270 // TODO(ahe): Restore type when feature is implemented in dart2js | 294 // TODO(ahe): Restore type when feature is implemented in dart2js |
| 271 // checked mode. http://dartbug.com/7733 | 295 // checked mode. http://dartbug.com/7733 |
| 272 (/*T*/ element) { | 296 (/*T*/ element) { |
| 273 _runUserCode( | 297 _runUserCode( |
| 274 () => combine(value, element), | 298 () => combine(value, element), |
| 275 (result) { value = result; }, | 299 (newValue) { value = newValue; }, |
| 276 _cancelAndError(subscription, result) | 300 _cancelAndError(subscription, result) |
| 277 ); | 301 ); |
| 278 }, | 302 }, |
| 279 onError: (AsyncError e) { | 303 onError: (AsyncError e) { |
| 280 result._setError(e); | 304 result._setError(e); |
| 281 }, | 305 }, |
| 282 onDone: () { | 306 onDone: () { |
| 283 result._setValue(value); | 307 result._setValue(value); |
| 284 }, | 308 }, |
| 285 unsubscribeOnError: true); | 309 unsubscribeOnError: true); |
| (...skipping 121 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 407 this.listen( | 431 this.listen( |
| 408 (_) { count++; }, | 432 (_) { count++; }, |
| 409 onError: future._setError, | 433 onError: future._setError, |
| 410 onDone: () { | 434 onDone: () { |
| 411 future._setValue(count); | 435 future._setValue(count); |
| 412 }, | 436 }, |
| 413 unsubscribeOnError: true); | 437 unsubscribeOnError: true); |
| 414 return future; | 438 return future; |
| 415 } | 439 } |
| 416 | 440 |
| 417 /** | |
| 418 * Finds the least element in the stream. | |
| 419 * | |
| 420 * If the stream is empty, the result is [:null:]. | |
| 421 * Otherwise the result is a value from the stream that is not greater | |
| 422 * than any other value from the stream (according to [compare], which must | |
| 423 * be a [Comparator]). | |
| 424 * | |
| 425 * If [compare] is omitted, it defaults to [Comparable.compare]. | |
| 426 * | |
| 427 * *Deprecated*. Use [reduce] with a binary min method if needed. | |
| 428 */ | |
| 429 Future<T> min([int compare(T a, T b)]) { | |
| 430 if (compare == null) { | |
| 431 var defaultCompare = Comparable.compare; | |
| 432 compare = defaultCompare; | |
| 433 } | |
| 434 _FutureImpl<T> future = new _FutureImpl<T>(); | |
| 435 StreamSubscription subscription; | |
| 436 T min = null; | |
| 437 subscription = this.listen( | |
| 438 // TODO(ahe): Restore type when feature is implemented in dart2js | |
| 439 // checked mode. http://dartbug.com/7733 | |
| 440 (/*T*/ value) { | |
| 441 min = value; | |
| 442 subscription.onData((T value) { | |
| 443 _runUserCode( | |
| 444 () => compare(min, value) > 0, | |
| 445 (bool foundSmaller) { | |
| 446 if (foundSmaller) { | |
| 447 min = value; | |
| 448 } | |
| 449 }, | |
| 450 _cancelAndError(subscription, future) | |
| 451 ); | |
| 452 }); | |
| 453 }, | |
| 454 onError: future._setError, | |
| 455 onDone: () { | |
| 456 future._setValue(min); | |
| 457 }, | |
| 458 unsubscribeOnError: true | |
| 459 ); | |
| 460 return future; | |
| 461 } | |
| 462 | |
| 463 /** | |
| 464 * Finds the largest element in the stream. | |
| 465 * | |
| 466 * If the stream is empty, the result is [:null:]. | |
| 467 * Otherwise the result is an value from the stream that is not smaller | |
| 468 * than any other value from the stream (according to [compare], which must | |
| 469 * be a [Comparator]). | |
| 470 * | |
| 471 * If [compare] is omitted, it defaults to [Comparable.compare]. | |
| 472 * | |
| 473 * *Deprecated*. Use [reduce] with a binary max method if needed. | |
| 474 */ | |
| 475 Future<T> max([int compare(T a, T b)]) { | |
| 476 if (compare == null) { | |
| 477 var defaultCompare = Comparable.compare; | |
| 478 compare = defaultCompare; | |
| 479 } | |
| 480 _FutureImpl<T> future = new _FutureImpl<T>(); | |
| 481 StreamSubscription subscription; | |
| 482 T max = null; | |
| 483 subscription = this.listen( | |
| 484 // TODO(ahe): Restore type when feature is implemented in dart2js | |
| 485 // checked mode. http://dartbug.com/7733 | |
| 486 (/*T*/ value) { | |
| 487 max = value; | |
| 488 subscription.onData((T value) { | |
| 489 _runUserCode( | |
| 490 () => compare(max, value) < 0, | |
| 491 (bool foundGreater) { | |
| 492 if (foundGreater) { | |
| 493 max = value; | |
| 494 } | |
| 495 }, | |
| 496 _cancelAndError(subscription, future) | |
| 497 ); | |
| 498 }); | |
| 499 }, | |
| 500 onError: future._setError, | |
| 501 onDone: () { | |
| 502 future._setValue(max); | |
| 503 }, | |
| 504 unsubscribeOnError: true | |
| 505 ); | |
| 506 return future; | |
| 507 } | |
| 508 | |
| 509 /** Reports whether this stream contains any elements. */ | 441 /** Reports whether this stream contains any elements. */ |
| 510 Future<bool> get isEmpty { | 442 Future<bool> get isEmpty { |
| 511 _FutureImpl<bool> future = new _FutureImpl<bool>(); | 443 _FutureImpl<bool> future = new _FutureImpl<bool>(); |
| 512 StreamSubscription subscription; | 444 StreamSubscription subscription; |
| 513 subscription = this.listen( | 445 subscription = this.listen( |
| 514 (_) { | 446 (_) { |
| 515 subscription.cancel(); | 447 subscription.cancel(); |
| 516 future._setValue(false); | 448 future._setValue(false); |
| 517 }, | 449 }, |
| 518 onError: future._setError, | 450 onError: future._setError, |
| (...skipping 708 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 1227 | 1159 |
| 1228 /* TODO(8997): Implement EventSink instead, */ | 1160 /* TODO(8997): Implement EventSink instead, */ |
| 1229 class _EventOutputSinkWrapper<T> extends StreamSink<T> { | 1161 class _EventOutputSinkWrapper<T> extends StreamSink<T> { |
| 1230 _EventOutputSink _sink; | 1162 _EventOutputSink _sink; |
| 1231 _EventOutputSinkWrapper(this._sink); | 1163 _EventOutputSinkWrapper(this._sink); |
| 1232 | 1164 |
| 1233 void add(T data) { _sink._sendData(data); } | 1165 void add(T data) { _sink._sendData(data); } |
| 1234 void addError(AsyncError error) { _sink._sendError(error); } | 1166 void addError(AsyncError error) { _sink._sendError(error); } |
| 1235 void close() { _sink._sendDone(); } | 1167 void close() { _sink._sendDone(); } |
| 1236 } | 1168 } |
| OLD | NEW |