Chromium Code Reviews| OLD | NEW |
|---|---|
| 1 // Copyright (c) 2012, the Dart project authors. Please see the AUTHORS file | 1 // Copyright (c) 2012, the Dart project authors. Please see the AUTHORS file |
| 2 // for details. All rights reserved. Use of this source code is governed by a | 2 // for details. All rights reserved. Use of this source code is governed by a |
| 3 // BSD-style license that can be found in the LICENSE file. | 3 // BSD-style license that can be found in the LICENSE file. |
| 4 | 4 |
| 5 // part of dart.async; | 5 // part of dart.async; |
| 6 | 6 |
| 7 // ------------------------------------------------------------------- | 7 // ------------------------------------------------------------------- |
| 8 // Core Stream types | 8 // Core Stream types |
| 9 // ------------------------------------------------------------------- | 9 // ------------------------------------------------------------------- |
| 10 | 10 |
| (...skipping 44 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
| 55 * is called. If [onData] is null, nothing happens. | 55 * is called. If [onData] is null, nothing happens. |
| 56 * | 56 * |
| 57 * On errors from this stream, the [onError] handler is given a | 57 * On errors from this stream, the [onError] handler is given a |
| 58 * [AsyncError] object describing the error. | 58 * [AsyncError] object describing the error. |
| 59 * | 59 * |
| 60 * If this stream closes, the [onDone] handler is called. | 60 * If this stream closes, the [onDone] handler is called. |
| 61 * | 61 * |
| 62 * If [unsubscribeOnError] is true, the subscription is ended when | 62 * If [unsubscribeOnError] is true, the subscription is ended when |
| 63 * the first error is reported. The default is false. | 63 * the first error is reported. The default is false. |
| 64 */ | 64 */ |
| 65 StreamSubscription<T> subscribe({void onData(T event), | 65 StreamSubscription<T> listen(void onData(T event), |
| 66 void onError(AsyncError error), | 66 { void onError(AsyncError error), |
| 67 void onDone(), | 67 void onDone(), |
| 68 bool unsubscribeOnError}); | 68 bool unsubscribeOnError}); |
| 69 | 69 |
| 70 /** | 70 /** |
| 71 * Creates a new stream from this stream that discards some data events. | 71 * Creates a new stream from this stream that discards some data events. |
| 72 * | 72 * |
| 73 * The new stream sends the same error and done events as this stream, | 73 * The new stream sends the same error and done events as this stream, |
| 74 * but it only sends the data events that satisfy the [test]. | 74 * but it only sends the data events that satisfy the [test]. |
| 75 */ | 75 */ |
| 76 Stream<T> where(bool test(T event)) { | 76 Stream<T> where(bool test(T event)) { |
| 77 return this.chain(new WhereStream<T>(test)); | 77 return this.chain(new WhereStream<T>(test)); |
| 78 } | 78 } |
| (...skipping 63 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
| 142 */ | 142 */ |
| 143 Stream transform(StreamTransformer transformer) { | 143 Stream transform(StreamTransformer transformer) { |
| 144 return this.chain(new TransformStream(transformer)); | 144 return this.chain(new TransformStream(transformer)); |
| 145 } | 145 } |
| 146 | 146 |
| 147 | 147 |
| 148 /** Reduces a sequence of values by repeatedly applying [combine]. */ | 148 /** Reduces a sequence of values by repeatedly applying [combine]. */ |
| 149 Future reduce(var initialValue, combine(var previous, T element)) { | 149 Future reduce(var initialValue, combine(var previous, T element)) { |
| 150 Completer completer = new Completer(); | 150 Completer completer = new Completer(); |
| 151 var value = initialValue; | 151 var value = initialValue; |
| 152 StreamSubscription subscription = this.subscribe(unsubscribeOnError: true); | 152 StreamSubscription subscription; |
| 153 subscription..onData((T element) { | 153 subscription = this.listen( |
| 154 try { | 154 (T element) { |
| 155 value = combine(value, element); | 155 try { |
| 156 } catch (e, s) { | 156 value = combine(value, element); |
| 157 subscription.unsubscribe(); | 157 } catch (e, s) { |
| 158 completer.completeError(e, s); | 158 subscription.cancel(); |
| 159 } | 159 completer.completeError(e, s); |
| 160 })..onError((AsyncError e) { | 160 } |
| 161 completer.completeError(e.error, e.stackTrace); | 161 }, |
| 162 })..onDone(() { | 162 unsubscribeOnError: true, |
| 163 completer.complete(value); | 163 onError: (AsyncError e) { |
| 164 }); | 164 completer.completeError(e.error, e.stackTrace); |
| 165 }, | |
| 166 onDone: () { | |
| 167 completer.complete(value); | |
| 168 }); | |
| 165 return completer.future; | 169 return completer.future; |
| 166 } | 170 } |
| 167 | 171 |
| 168 // Deprecated method, previously called 'pipe', retained for compatibility. | 172 // Deprecated method, previously called 'pipe', retained for compatibility. |
| 169 Signal pipeInto(Sink<T> sink, | 173 Signal pipeInto(Sink<T> sink, |
| 170 {void onError(AsyncError error), | 174 {void onError(AsyncError error), |
| 171 bool unsubscribeOnError}) { | 175 bool unsubscribeOnError}) { |
| 172 SignalCompleter completer = new SignalCompleter(); | 176 SignalCompleter completer = new SignalCompleter(); |
| 173 this.subscribe( | 177 this.listen( |
| 174 onData: sink.add, | 178 sink.add, |
| 175 onError: onError, | 179 onError: onError, |
| 176 onDone: () { | 180 onDone: () { |
| 177 sink.close(); | 181 sink.close(); |
| 178 completer.complete(); | 182 completer.complete(); |
| 179 }, | 183 }, |
| 180 unsubscribeOnError: unsubscribeOnError); | 184 unsubscribeOnError: unsubscribeOnError); |
| 181 return completer.signal; | 185 return completer.signal; |
| 182 } | 186 } |
| 183 | 187 |
| 184 | 188 |
| 185 /** | 189 /** |
| 186 * Check whether [match] occurs in the elements provided by this stream. | 190 * Check whether [match] occurs in the elements provided by this stream. |
| 187 * | 191 * |
| 188 * Completes the [Future] when the answer is known. | 192 * Completes the [Future] when the answer is known. |
| 189 * If this stream reports an error, the [Future] will report that error. | 193 * If this stream reports an error, the [Future] will report that error. |
| 190 */ | 194 */ |
| 191 Future<bool> contains(T match) { | 195 Future<bool> contains(T match) { |
| 192 _FutureImpl<bool> future = new _FutureImpl<bool>(); | 196 _FutureImpl<bool> future = new _FutureImpl<bool>(); |
| 193 StreamSubscription subscription; | 197 StreamSubscription subscription; |
| 194 subscription = subscribe( | 198 subscription = listen( |
|
Lasse Reichstein Nielsen
2013/01/04 08:17:55
Not your code (probably mine), but ...
This uses "
floitsch
2013/01/04 15:51:36
Added this. here at the other use-sites.
| |
| 195 onData: (T element) { | 199 (T element) { |
| 196 if (element == match) { | 200 if (element == match) { |
| 197 subscription.unsubscribe(); | 201 subscription.cancel(); |
| 198 future._setValue(true); | 202 future._setValue(true); |
| 199 } | 203 } |
| 200 }, | 204 }, |
| 201 onError: future._setError, | 205 onError: future._setError, |
| 202 onDone: () { | 206 onDone: () { |
| 203 future._setValue(false); | 207 future._setValue(false); |
| 204 }, | 208 }, |
| 205 unsubscribeOnError: true); | 209 unsubscribeOnError: true); |
| 206 return future; | 210 return future; |
| 207 } | 211 } |
| 208 | 212 |
| 209 /** | 213 /** |
| 210 * Check whether [test] accepts all elements provided by this stream. | 214 * Check whether [test] accepts all elements provided by this stream. |
| 211 * | 215 * |
| 212 * Completes the [Future] when the answer is known. | 216 * Completes the [Future] when the answer is known. |
| 213 * If this stream reports an error, the [Future] will report that error. | 217 * If this stream reports an error, the [Future] will report that error. |
| 214 */ | 218 */ |
| 215 Future<bool> every(bool test(T element)) { | 219 Future<bool> every(bool test(T element)) { |
| 216 _FutureImpl<bool> future = new _FutureImpl<bool>(); | 220 _FutureImpl<bool> future = new _FutureImpl<bool>(); |
| 217 StreamSubscription subscription; | 221 StreamSubscription subscription; |
| 218 subscription = subscribe( | 222 subscription = listen( |
| 219 onData: (T element) { | 223 (T element) { |
| 220 if (!test(element)) { | 224 if (!test(element)) { |
| 221 subscription.unsubscribe(); | 225 subscription.cancel(); |
| 222 future._setValue(false); | 226 future._setValue(false); |
| 223 } | 227 } |
| 224 }, | 228 }, |
| 225 onError: future._setError, | 229 onError: future._setError, |
| 226 onDone: () { | 230 onDone: () { |
| 227 future._setValue(true); | 231 future._setValue(true); |
| 228 }, | 232 }, |
| 229 unsubscribeOnError: true); | 233 unsubscribeOnError: true); |
| 230 return future; | 234 return future; |
| 231 } | 235 } |
| 232 | 236 |
| 233 /** | 237 /** |
| 234 * Check whether [test] accepts any element provided by this stream. | 238 * Check whether [test] accepts any element provided by this stream. |
| 235 * | 239 * |
| 236 * Completes the [Future] when the answer is known. | 240 * Completes the [Future] when the answer is known. |
| 237 * If this stream reports an error, the [Future] will report that error. | 241 * If this stream reports an error, the [Future] will report that error. |
| 238 */ | 242 */ |
| 239 Future<bool> any(bool test(T element)) { | 243 Future<bool> any(bool test(T element)) { |
| 240 _FutureImpl<bool> future = new _FutureImpl<bool>(); | 244 _FutureImpl<bool> future = new _FutureImpl<bool>(); |
| 241 StreamSubscription subscription; | 245 StreamSubscription subscription; |
| 242 subscription = subscribe( | 246 subscription = listen( |
| 243 onData: (T element) { | 247 (T element) { |
| 244 if (test(element)) { | 248 if (test(element)) { |
| 245 subscription.unsubscribe(); | 249 subscription.cancel(); |
| 246 future._setValue(true); | 250 future._setValue(true); |
| 247 } | 251 } |
| 248 }, | 252 }, |
| 249 onError: future._setError, | 253 onError: future._setError, |
| 250 onDone: () { | 254 onDone: () { |
| 251 future._setValue(false); | 255 future._setValue(false); |
| 252 }, | 256 }, |
| 253 unsubscribeOnError: true); | 257 unsubscribeOnError: true); |
| 254 return future; | 258 return future; |
| 255 } | 259 } |
| 256 | 260 |
| 257 | 261 |
| 258 /** Counts the elements in the stream. */ | 262 /** Counts the elements in the stream. */ |
| 259 Future<int> get length { | 263 Future<int> get length { |
| 260 _FutureImpl<int> future = new _FutureImpl<int>(); | 264 _FutureImpl<int> future = new _FutureImpl<int>(); |
| 261 int count = 0; | 265 int count = 0; |
| 262 subscribe( | 266 listen( |
| 263 onData: (_) { count++; }, | 267 (_) { count++; }, |
| 264 onError: future._setError, | 268 onError: future._setError, |
| 265 onDone: () { | 269 onDone: () { |
| 266 future._setValue(count); | 270 future._setValue(count); |
| 267 }, | 271 }, |
| 268 unsubscribeOnError: true); | 272 unsubscribeOnError: true); |
| 269 return future; | 273 return future; |
| 270 } | 274 } |
| 271 | 275 |
| 272 /** | 276 /** |
| 273 * Finds the least element in the stream. | 277 * Finds the least element in the stream. |
| 274 * | 278 * |
| 275 * If the stream is empty, the result is [:null:]. | 279 * If the stream is empty, the result is [:null:]. |
| 276 * Otherwise the result is a value from the stream that is not greater | 280 * Otherwise the result is a value from the stream that is not greater |
| 277 * than any other value from the stream (according to [compare], which must | 281 * than any other value from the stream (according to [compare], which must |
| 278 * be a [Comparator]). | 282 * be a [Comparator]). |
| 279 * | 283 * |
| 280 * If [compare] is omitted, it defaults to [Comparable.compare]. | 284 * If [compare] is omitted, it defaults to [Comparable.compare]. |
| 281 */ | 285 */ |
| 282 Future<T> min([int compare(T a, T b)]) { | 286 Future<T> min([int compare(T a, T b)]) { |
| 283 _FutureImpl<T> future = new _FutureImpl<T>(); | 287 _FutureImpl<T> future = new _FutureImpl<T>(); |
| 284 StreamSubscription subscription; | 288 StreamSubscription subscription; |
| 285 T min = null; | 289 T min = null; |
| 286 subscription = subscribe( | 290 subscription = listen( |
| 287 onData: (T value) { | 291 (T value) { |
| 288 min = value; | 292 min = value; |
| 289 subscription.onData = (T value) { | 293 subscription.onData = (T value) { |
| 290 if (compare(min, value) > 0) min = value; | 294 if (compare(min, value) > 0) min = value; |
| 291 }; | 295 }; |
| 292 }, | 296 }, |
| 293 onError: future.setError, | 297 onError: future.setError, |
| 294 onDone: () { | 298 onDone: () { |
| 295 future._setValue(min); | 299 future._setValue(min); |
| 296 }, | 300 }, |
| 297 unsubscribeOnError: true | 301 unsubscribeOnError: true |
| 298 ); | 302 ); |
| 299 } | 303 } |
| 300 | 304 |
| 301 /** | 305 /** |
| 302 * Finds the least element in the stream. | 306 * Finds the least element in the stream. |
| 303 * | 307 * |
| 304 * If the stream is emtpy, the result is [:null:]. | 308 * If the stream is emtpy, the result is [:null:]. |
| 305 * Otherwise the result is an value from the stream that is not greater | 309 * Otherwise the result is an value from the stream that is not greater |
| 306 * than any other value from the stream (according to [compare], which must | 310 * than any other value from the stream (according to [compare], which must |
| 307 * be a [Comparator]). | 311 * be a [Comparator]). |
| 308 * | 312 * |
| 309 * If [compare] is omitted, it defaults to [Comparable.compare]. | 313 * If [compare] is omitted, it defaults to [Comparable.compare]. |
| 310 */ | 314 */ |
| 311 Future<T> max([int compare(T a, T b)]) { | 315 Future<T> max([int compare(T a, T b)]) { |
| 312 _FutureImpl<T> future = new _FutureImpl<T>(); | 316 _FutureImpl<T> future = new _FutureImpl<T>(); |
| 313 StreamSubscription subscription; | 317 StreamSubscription subscription; |
| 314 T max = null; | 318 T max = null; |
| 315 subscription = subscribe( | 319 subscription = listen( |
| 316 onData: (T value) { | 320 (T value) { |
| 317 max = value; | 321 max = value; |
| 318 subscription.onData = (T value) { | 322 subscription.onData = (T value) { |
| 319 if (compare(max, value) < 0) max = value; | 323 if (compare(max, value) < 0) max = value; |
| 320 }; | 324 }; |
| 321 }, | 325 }, |
| 322 onError: future.setError, | 326 onError: future.setError, |
| 323 onDone: () { | 327 onDone: () { |
| 324 future._setValue(max); | 328 future._setValue(max); |
| 325 }, | 329 }, |
| 326 unsubscribeOnError: true | 330 unsubscribeOnError: true |
| 327 ); | 331 ); |
| 328 } | 332 } |
| 329 | 333 |
| 330 /** Reports whether this stream contains any elements. */ | 334 /** Reports whether this stream contains any elements. */ |
| 331 Future<bool> get isEmpty { | 335 Future<bool> get isEmpty { |
| 332 _FutureImpl<bool> future = new _FutureImpl<bool>(); | 336 _FutureImpl<bool> future = new _FutureImpl<bool>(); |
| 333 StreamSubscription subscription; | 337 StreamSubscription subscription; |
| 334 subscription = subscribe( | 338 subscription = listen( |
| 335 onData: (_) { | 339 (_) { |
| 336 subscription.unsubscribe(); | 340 subscription.cancel(); |
| 337 future._setValue(false); | 341 future._setValue(false); |
| 338 }, | 342 }, |
| 339 onError: future._setError, | 343 onError: future._setError, |
| 340 onDone: () { | 344 onDone: () { |
| 341 future._setValue(true); | 345 future._setValue(true); |
| 342 }, | 346 }, |
| 343 unsubscribeOnError: true); | 347 unsubscribeOnError: true); |
| 344 return future; | 348 return future; |
| 345 } | 349 } |
| 346 | 350 |
| 347 /** Collect the data of this stream in a [List]. */ | 351 /** Collect the data of this stream in a [List]. */ |
| 348 Future<List<T>> toList() { | 352 Future<List<T>> toList() { |
| 349 List<T> result = <T>[]; | 353 List<T> result = <T>[]; |
| 350 _FutureImpl<List<T>> future = new _FutureImpl<List<T>>(); | 354 _FutureImpl<List<T>> future = new _FutureImpl<List<T>>(); |
| 351 subscribe( | 355 listen( |
| 352 onData: (T data) { | 356 (T data) { |
| 353 result.add(data); | 357 result.add(data); |
| 354 }, | 358 }, |
| 355 onError: future._setError, | 359 onError: future._setError, |
| 356 onDone: () { | 360 onDone: () { |
| 357 future._setValue(result); | 361 future._setValue(result); |
| 358 }, | 362 }, |
| 359 unsubscribeOnError: true); | 363 unsubscribeOnError: true); |
| 360 return future; | 364 return future; |
| 361 } | 365 } |
| 362 | 366 |
| 363 /** Collect the data of this stream in a [Set]. */ | 367 /** Collect the data of this stream in a [Set]. */ |
| 364 Future<Set<T>> toSet() { | 368 Future<Set<T>> toSet() { |
| 365 Set<T> result = new Set<T>(); | 369 Set<T> result = new Set<T>(); |
| 366 _FutureImpl<Set<T>> future = new _FutureImpl<Set<T>>(); | 370 _FutureImpl<Set<T>> future = new _FutureImpl<Set<T>>(); |
| 367 subscribe( | 371 listen( |
| 368 onData: (T data) { | 372 (T data) { |
| 369 result.add(data); | 373 result.add(data); |
| 370 }, | 374 }, |
| 371 onError: future._setError, | 375 onError: future._setError, |
| 372 onDone: () { | 376 onDone: () { |
| 373 future._setValue(result); | 377 future._setValue(result); |
| 374 }, | 378 }, |
| 375 unsubscribeOnError: true); | 379 unsubscribeOnError: true); |
| 376 return future; | 380 return future; |
| 377 } | 381 } |
| 378 | 382 |
| (...skipping 56 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
| 435 | 439 |
| 436 /** | 440 /** |
| 437 * Returns the first element. | 441 * Returns the first element. |
| 438 * | 442 * |
| 439 * If [this] is empty throws a [StateError]. Otherwise this method is | 443 * If [this] is empty throws a [StateError]. Otherwise this method is |
| 440 * equivalent to [:this.elementAt(0):] | 444 * equivalent to [:this.elementAt(0):] |
| 441 */ | 445 */ |
| 442 Future<T> get first { | 446 Future<T> get first { |
| 443 _FutureImpl<T> future = new _FutureImpl(); | 447 _FutureImpl<T> future = new _FutureImpl(); |
| 444 StreamSubscription subscription; | 448 StreamSubscription subscription; |
| 445 subscription = subscribe( | 449 subscription = listen( |
| 446 onData: (T value) { | 450 (T value) { |
| 447 future._setValue(value); | 451 future._setValue(value); |
| 448 subscription.unsubscribe(); | 452 subscription.cancel(); |
| 449 return; | 453 return; |
| 450 }, | 454 }, |
| 451 onError: future._setError, | 455 onError: future._setError, |
| 452 onDone: () { | 456 onDone: () { |
| 453 future._setError(new AsyncError(new StateError("No elements"))); | 457 future._setError(new AsyncError(new StateError("No elements"))); |
| 454 }, | 458 }, |
| 455 unsubscribeOnError: true); | 459 unsubscribeOnError: true); |
| 456 return future; | 460 return future; |
| 457 } | 461 } |
| 458 | 462 |
| 459 /** | 463 /** |
| 460 * Returns the last element. | 464 * Returns the last element. |
| 461 * | 465 * |
| 462 * If [this] is empty throws a [StateError]. | 466 * If [this] is empty throws a [StateError]. |
| 463 */ | 467 */ |
| 464 Future<T> get last { | 468 Future<T> get last { |
| 465 _FutureImpl<T> future = new _FutureImpl<T>(); | 469 _FutureImpl<T> future = new _FutureImpl<T>(); |
| 466 T result = null; | 470 T result = null; |
| 467 bool foundResult = false; | 471 bool foundResult = false; |
| 468 StreamSubscription subscription; | 472 StreamSubscription subscription; |
| 469 subscription = subscribe( | 473 subscription = listen( |
| 470 onData: (T value) { | 474 (T value) { |
| 471 foundResult = true; | 475 foundResult = true; |
| 472 result = value; | 476 result = value; |
| 473 }, | 477 }, |
| 474 onError: future._setError, | 478 onError: future._setError, |
| 475 onDone: () { | 479 onDone: () { |
| 476 if (foundResult) { | 480 if (foundResult) { |
| 477 future._setValue(result); | 481 future._setValue(result); |
| 478 return; | 482 return; |
| 479 } | 483 } |
| 480 future._setError(new AsyncError(new StateError("No elements"))); | 484 future._setError(new AsyncError(new StateError("No elements"))); |
| 481 }, | 485 }, |
| 482 unsubscribeOnError: true); | 486 unsubscribeOnError: true); |
| 483 return future; | 487 return future; |
| 484 } | 488 } |
| 485 | 489 |
| 486 /** | 490 /** |
| 487 * Returns the single element. | 491 * Returns the single element. |
| 488 * | 492 * |
| 489 * If [this] is empty or has more than one element throws a [StateError]. | 493 * If [this] is empty or has more than one element throws a [StateError]. |
| 490 */ | 494 */ |
| 491 Future<T> get single { | 495 Future<T> get single { |
| 492 _FutureImpl<T> future = new _FutureImpl<T>(); | 496 _FutureImpl<T> future = new _FutureImpl<T>(); |
| 493 T result = null; | 497 T result = null; |
| 494 bool foundResult = false; | 498 bool foundResult = false; |
| 495 StreamSubscription subscription; | 499 StreamSubscription subscription; |
| 496 subscription = subscribe( | 500 subscription = listen( |
| 497 onData: (T value) { | 501 (T value) { |
| 498 if (foundResult) { | 502 if (foundResult) { |
| 499 // This is the second element we get. | 503 // This is the second element we get. |
| 500 Error error = new StateError("More than one element"); | 504 Error error = new StateError("More than one element"); |
| 501 future._setError(new AsyncError(error)); | 505 future._setError(new AsyncError(error)); |
| 502 subscription.unsubscribe(); | 506 subscription.cancel(); |
| 503 return; | 507 return; |
| 504 } | 508 } |
| 505 foundResult = true; | 509 foundResult = true; |
| 506 result = value; | 510 result = value; |
| 507 }, | 511 }, |
| 508 onError: future._setError, | 512 onError: future._setError, |
| 509 onDone: () { | 513 onDone: () { |
| 510 if (foundResult) { | 514 if (foundResult) { |
| 511 future._setValue(result); | 515 future._setValue(result); |
| 512 return; | 516 return; |
| (...skipping 14 matching lines...) Expand all Loading... | |
| 527 * [defaultValue] function is provided, the result of calling [defaultValue] | 531 * [defaultValue] function is provided, the result of calling [defaultValue] |
| 528 * becomes the value of the future. | 532 * becomes the value of the future. |
| 529 * | 533 * |
| 530 * If an error occurs, or if this stream ends without finding a match and | 534 * If an error occurs, or if this stream ends without finding a match and |
| 531 * with no [defaultValue] function provided, the future will receive an | 535 * with no [defaultValue] function provided, the future will receive an |
| 532 * error. | 536 * error. |
| 533 */ | 537 */ |
| 534 Future<T> firstMatching(bool test(T value), {T defaultValue()}) { | 538 Future<T> firstMatching(bool test(T value), {T defaultValue()}) { |
| 535 _FutureImpl<T> future = new _FutureImpl<T>(); | 539 _FutureImpl<T> future = new _FutureImpl<T>(); |
| 536 StreamSubscription subscription; | 540 StreamSubscription subscription; |
| 537 subscription = subscribe( | 541 subscription = listen( |
| 538 onData: (T value) { | 542 (T value) { |
| 539 bool matches; | 543 bool matches; |
| 540 try { | 544 try { |
| 541 matches = (true == test(value)); | 545 matches = (true == test(value)); |
| 542 } catch (e, s) { | 546 } catch (e, s) { |
| 543 future._setError(new AsyncError(e, s)); | 547 future._setError(new AsyncError(e, s)); |
| 544 subscription.unsubscribe(); | 548 subscription.cancel(); |
| 545 return; | 549 return; |
| 546 } | 550 } |
| 547 if (matches) { | 551 if (matches) { |
| 548 future._setValue(value); | 552 future._setValue(value); |
| 549 subscription.unsubscribe(); | 553 subscription.cancel(); |
| 550 } | 554 } |
| 551 }, | 555 }, |
| 552 onError: future._setError, | 556 onError: future._setError, |
| 553 onDone: () { | 557 onDone: () { |
| 554 if (defaultValue != null) { | 558 if (defaultValue != null) { |
| 555 T value; | 559 T value; |
| 556 try { | 560 try { |
| 557 value = defaultValue(); | 561 value = defaultValue(); |
| 558 } catch (e, s) { | 562 } catch (e, s) { |
| 559 future._setError(new AsyncError(e, s)); | 563 future._setError(new AsyncError(e, s)); |
| (...skipping 14 matching lines...) Expand all Loading... | |
| 574 * | 578 * |
| 575 * As [firstMatching], except that the last matching element is found. | 579 * As [firstMatching], except that the last matching element is found. |
| 576 * That means that the result cannot be provided before this stream | 580 * That means that the result cannot be provided before this stream |
| 577 * is done. | 581 * is done. |
| 578 */ | 582 */ |
| 579 Future<T> lastMatching(bool test(T value), {T defaultValue()}) { | 583 Future<T> lastMatching(bool test(T value), {T defaultValue()}) { |
| 580 _FutureImpl<T> future = new _FutureImpl<T>(); | 584 _FutureImpl<T> future = new _FutureImpl<T>(); |
| 581 T result = null; | 585 T result = null; |
| 582 bool foundResult = false; | 586 bool foundResult = false; |
| 583 StreamSubscription subscription; | 587 StreamSubscription subscription; |
| 584 subscription = subscribe( | 588 subscription = listen( |
| 585 onData: (T value) { | 589 (T value) { |
| 586 bool matches; | 590 bool matches; |
| 587 try { | 591 try { |
| 588 matches = (true == test(value)); | 592 matches = (true == test(value)); |
| 589 } catch (e, s) { | 593 } catch (e, s) { |
| 590 future._setError(new AsyncError(e, s)); | 594 future._setError(new AsyncError(e, s)); |
| 591 subscription.unsubscribe(); | 595 subscription.cancel(); |
| 592 return; | 596 return; |
| 593 } | 597 } |
| 594 if (matches) { | 598 if (matches) { |
| 595 foundResult = true; | 599 foundResult = true; |
| 596 result = value; | 600 result = value; |
| 597 } | 601 } |
| 598 }, | 602 }, |
| 599 onError: future._setError, | 603 onError: future._setError, |
| 600 onDone: () { | 604 onDone: () { |
| 601 if (foundResult) { | 605 if (foundResult) { |
| (...skipping 22 matching lines...) Expand all Loading... | |
| 624 * Finds the single element in this stream matching [test]. | 628 * Finds the single element in this stream matching [test]. |
| 625 * | 629 * |
| 626 * Like [lastMatch], except that it is an error if more than one | 630 * Like [lastMatch], except that it is an error if more than one |
| 627 * matching element occurs in the stream. | 631 * matching element occurs in the stream. |
| 628 */ | 632 */ |
| 629 Future<T> singleMatching(bool test(T value)) { | 633 Future<T> singleMatching(bool test(T value)) { |
| 630 _FutureImpl<T> future = new _FutureImpl<T>(); | 634 _FutureImpl<T> future = new _FutureImpl<T>(); |
| 631 T result = null; | 635 T result = null; |
| 632 bool foundResult = false; | 636 bool foundResult = false; |
| 633 StreamSubscription subscription; | 637 StreamSubscription subscription; |
| 634 subscription = subscribe( | 638 subscription = listen( |
| 635 onData: (T value) { | 639 (T value) { |
| 636 bool matches; | 640 bool matches; |
| 637 try { | 641 try { |
| 638 matches = (true == test(value)); | 642 matches = (true == test(value)); |
| 639 } catch (e, s) { | 643 } catch (e, s) { |
| 640 future._setError(new AsyncError(e, s)); | 644 future._setError(new AsyncError(e, s)); |
| 641 subscription.unsubscribe(); | 645 subscription.cancel(); |
| 642 return; | 646 return; |
| 643 } | 647 } |
| 644 if (matches) { | 648 if (matches) { |
| 645 if (foundResult) { | 649 if (foundResult) { |
| 646 future._setError(new AsyncError( | 650 future._setError(new AsyncError( |
| 647 new StateError('Multiple matches for "single"'))); | 651 new StateError('Multiple matches for "single"'))); |
| 648 subscription.unsubscribe(); | 652 subscription.cancel(); |
| 649 return; | 653 return; |
| 650 } | 654 } |
| 651 foundResult = true; | 655 foundResult = true; |
| 652 result = value; | 656 result = value; |
| 653 } | 657 } |
| 654 }, | 658 }, |
| 655 onError: future._setError, | 659 onError: future._setError, |
| 656 onDone: () { | 660 onDone: () { |
| 657 if (foundResult) { | 661 if (foundResult) { |
| 658 future._setValue(result); | 662 future._setValue(result); |
| (...skipping 10 matching lines...) Expand all Loading... | |
| 669 * Returns the value of the [index]th data event of this stream. | 673 * Returns the value of the [index]th data event of this stream. |
| 670 * | 674 * |
| 671 * If an error event occurs, the future will end with this error. | 675 * If an error event occurs, the future will end with this error. |
| 672 * | 676 * |
| 673 * If this stream provides fewer than [index] elements before closing, | 677 * If this stream provides fewer than [index] elements before closing, |
| 674 * an error is reported. | 678 * an error is reported. |
| 675 */ | 679 */ |
| 676 Future<T> elementAt(int index) { | 680 Future<T> elementAt(int index) { |
| 677 _FutureImpl<T> future = new _FutureImpl(); | 681 _FutureImpl<T> future = new _FutureImpl(); |
| 678 StreamSubscription subscription; | 682 StreamSubscription subscription; |
| 679 subscription = subscribe( | 683 subscription = listen( |
| 680 onData: (T value) { | 684 (T value) { |
| 681 if (index == 0) { | 685 if (index == 0) { |
| 682 future._setValue(value); | 686 future._setValue(value); |
| 683 subscription.unsubscribe(); | 687 subscription.cancel(); |
| 684 return; | 688 return; |
| 685 } | 689 } |
| 686 index -= 1; | 690 index -= 1; |
| 687 }, | 691 }, |
| 688 onError: future._setError, | 692 onError: future._setError, |
| 689 onDone: () { | 693 onDone: () { |
| 690 future._setError(new AsyncError( | 694 future._setError(new AsyncError( |
| 691 new StateError("Not enough elements for elementAt"))); | 695 new StateError("Not enough elements for elementAt"))); |
| 692 }, | 696 }, |
| 693 unsubscribeOnError: true); | 697 unsubscribeOnError: true); |
| 694 return future; | 698 return future; |
| 695 } | 699 } |
| 696 } | 700 } |
| 697 | 701 |
| 698 /** | 702 /** |
| 699 * A control object for the subscription on a [Stream]. | 703 * A control object for the subscription on a [Stream]. |
| 700 * | 704 * |
| 701 * When you subscribe on a [Stream] using [Stream.subscribe], | 705 * When you subscribe on a [Stream] using [Stream.subscribe], |
| 702 * a [StreamSubscription] object is returned. This object | 706 * a [StreamSubscription] object is returned. This object |
| 703 * is used to later unsubscribe again, or to temporarily pause | 707 * is used to later unsubscribe again, or to temporarily pause |
| 704 * the stream's events. | 708 * the stream's events. |
| 705 */ | 709 */ |
| 706 abstract class StreamSubscription<T> { | 710 abstract class StreamSubscription<T> { |
| 707 /** | 711 /** |
| 708 * Cancels this subscription. It will no longer receive events. | 712 * Cancels this subscription. It will no longer receive events. |
| 709 * | 713 * |
| 710 * If an event is currently firing, this unsubscription will only | 714 * If an event is currently firing, this unsubscription will only |
| 711 * take effect after all subscribers have received the current event. | 715 * take effect after all subscribers have received the current event. |
| 712 */ | 716 */ |
| 713 void unsubscribe(); | 717 void cancel(); |
| 714 | 718 |
| 715 /** Set or override the data event handler of this subscription. */ | 719 /** Set or override the data event handler of this subscription. */ |
| 716 void onData(void handleData(T data)); | 720 void onData(void handleData(T data)); |
| 717 | 721 |
| 718 /** Set or override the error event handler of this subscription. */ | 722 /** Set or override the error event handler of this subscription. */ |
| 719 void onError(void handleError(AsyncError error)); | 723 void onError(void handleError(AsyncError error)); |
| 720 | 724 |
| 721 /** Set or override the done event handler of this subscription. */ | 725 /** Set or override the done event handler of this subscription. */ |
| 722 void onDone(void handleDone()); | 726 void onDone(void handleDone()); |
| 723 | 727 |
| (...skipping 25 matching lines...) Expand all Loading... | |
| 749 void signalError(AsyncError errorEvent); | 753 void signalError(AsyncError errorEvent); |
| 750 void close(); | 754 void close(); |
| 751 } | 755 } |
| 752 | 756 |
| 753 /** [Stream] wrapper that only exposes the [Stream] interface. */ | 757 /** [Stream] wrapper that only exposes the [Stream] interface. */ |
| 754 class StreamView<T> extends Stream<T> { | 758 class StreamView<T> extends Stream<T> { |
| 755 Stream<T> _stream; | 759 Stream<T> _stream; |
| 756 | 760 |
| 757 StreamView(this._stream); | 761 StreamView(this._stream); |
| 758 | 762 |
| 759 StreamSubscription<T> subscribe({void onData(T value), | 763 StreamSubscription<T> listen(void onData(T value), |
| 760 void onError(AsyncError error), | 764 { void onError(AsyncError error), |
| 761 void onDone(), | 765 void onDone(), |
| 762 bool unsubscribeOnError}) { | 766 bool unsubscribeOnError}) { |
| 763 return _stream.subscribe(onData: onData, onError: onError, onDone: onDone, | 767 return _stream.listen(onData, onError: onError, onDone: onDone, |
| 764 unsubscribeOnError: unsubscribeOnError); | 768 unsubscribeOnError: unsubscribeOnError); |
| 765 } | 769 } |
| 766 } | 770 } |
| 767 | 771 |
| 768 /** | 772 /** |
| 769 * [StreamSink] wrapper that only exposes the [StreamSink] interface. | 773 * [StreamSink] wrapper that only exposes the [StreamSink] interface. |
| 770 */ | 774 */ |
| 771 class StreamSinkView<T> implements StreamSink<T> { | 775 class StreamSinkView<T> implements StreamSink<T> { |
| 772 final StreamSink<T> _sink; | 776 final StreamSink<T> _sink; |
| 773 | 777 |
| (...skipping 67 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
| 841 sink.signalError(error); | 845 sink.signalError(error); |
| 842 } | 846 } |
| 843 | 847 |
| 844 /** | 848 /** |
| 845 * Handle an incoming done event. | 849 * Handle an incoming done event. |
| 846 */ | 850 */ |
| 847 void handleDone(StreamSink<T> sink) { | 851 void handleDone(StreamSink<T> sink) { |
| 848 sink.close(); | 852 sink.close(); |
| 849 } | 853 } |
| 850 } | 854 } |
| OLD | NEW |