Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(305)

Side by Side Diff: sdk/lib/async/stream.dart

Issue 11740027: Rename unsubscribe to cancel. (Closed) Base URL: https://dart.googlecode.com/svn/experimental/lib_v2/dart
Patch Set: Fix error message. Created 7 years, 11 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch | Annotate | Revision Log
OLDNEW
1 // Copyright (c) 2012, the Dart project authors. Please see the AUTHORS file 1 // Copyright (c) 2012, the Dart project authors. Please see the AUTHORS file
2 // for details. All rights reserved. Use of this source code is governed by a 2 // for details. All rights reserved. Use of this source code is governed by a
3 // BSD-style license that can be found in the LICENSE file. 3 // BSD-style license that can be found in the LICENSE file.
4 4
5 // part of dart.async; 5 // part of dart.async;
6 6
7 // ------------------------------------------------------------------- 7 // -------------------------------------------------------------------
8 // Core Stream types 8 // Core Stream types
9 // ------------------------------------------------------------------- 9 // -------------------------------------------------------------------
10 10
(...skipping 44 matching lines...) Expand 10 before | Expand all | Expand 10 after
55 * is called. If [onData] is null, nothing happens. 55 * is called. If [onData] is null, nothing happens.
56 * 56 *
57 * On errors from this stream, the [onError] handler is given a 57 * On errors from this stream, the [onError] handler is given a
58 * [AsyncError] object describing the error. 58 * [AsyncError] object describing the error.
59 * 59 *
60 * If this stream closes, the [onDone] handler is called. 60 * If this stream closes, the [onDone] handler is called.
61 * 61 *
62 * If [unsubscribeOnError] is true, the subscription is ended when 62 * If [unsubscribeOnError] is true, the subscription is ended when
63 * the first error is reported. The default is false. 63 * the first error is reported. The default is false.
64 */ 64 */
65 StreamSubscription<T> subscribe({void onData(T event), 65 StreamSubscription<T> listen(void onData(T event),
66 void onError(AsyncError error), 66 { void onError(AsyncError error),
67 void onDone(), 67 void onDone(),
68 bool unsubscribeOnError}); 68 bool unsubscribeOnError});
69 69
70 /** 70 /**
71 * Creates a new stream from this stream that discards some data events. 71 * Creates a new stream from this stream that discards some data events.
72 * 72 *
73 * The new stream sends the same error and done events as this stream, 73 * The new stream sends the same error and done events as this stream,
74 * but it only sends the data events that satisfy the [test]. 74 * but it only sends the data events that satisfy the [test].
75 */ 75 */
76 Stream<T> where(bool test(T event)) { 76 Stream<T> where(bool test(T event)) {
77 return this.chain(new WhereStream<T>(test)); 77 return this.chain(new WhereStream<T>(test));
78 } 78 }
(...skipping 63 matching lines...) Expand 10 before | Expand all | Expand 10 after
142 */ 142 */
143 Stream transform(StreamTransformer transformer) { 143 Stream transform(StreamTransformer transformer) {
144 return this.chain(new TransformStream(transformer)); 144 return this.chain(new TransformStream(transformer));
145 } 145 }
146 146
147 147
148 /** Reduces a sequence of values by repeatedly applying [combine]. */ 148 /** Reduces a sequence of values by repeatedly applying [combine]. */
149 Future reduce(var initialValue, combine(var previous, T element)) { 149 Future reduce(var initialValue, combine(var previous, T element)) {
150 Completer completer = new Completer(); 150 Completer completer = new Completer();
151 var value = initialValue; 151 var value = initialValue;
152 StreamSubscription subscription = this.subscribe(unsubscribeOnError: true); 152 StreamSubscription subscription;
153 subscription..onData((T element) { 153 subscription = this.listen(
154 try { 154 (T element) {
155 value = combine(value, element); 155 try {
156 } catch (e, s) { 156 value = combine(value, element);
157 subscription.unsubscribe(); 157 } catch (e, s) {
158 completer.completeError(e, s); 158 subscription.cancel();
159 } 159 completer.completeError(e, s);
160 })..onError((AsyncError e) { 160 }
161 completer.completeError(e.error, e.stackTrace); 161 },
162 })..onDone(() { 162 unsubscribeOnError: true,
163 completer.complete(value); 163 onError: (AsyncError e) {
164 }); 164 completer.completeError(e.error, e.stackTrace);
165 },
166 onDone: () {
167 completer.complete(value);
168 });
165 return completer.future; 169 return completer.future;
166 } 170 }
167 171
168 // Deprecated method, previously called 'pipe', retained for compatibility. 172 // Deprecated method, previously called 'pipe', retained for compatibility.
169 Signal pipeInto(Sink<T> sink, 173 Signal pipeInto(Sink<T> sink,
170 {void onError(AsyncError error), 174 {void onError(AsyncError error),
171 bool unsubscribeOnError}) { 175 bool unsubscribeOnError}) {
172 SignalCompleter completer = new SignalCompleter(); 176 SignalCompleter completer = new SignalCompleter();
173 this.subscribe( 177 this.listen(
174 onData: sink.add, 178 sink.add,
175 onError: onError, 179 onError: onError,
176 onDone: () { 180 onDone: () {
177 sink.close(); 181 sink.close();
178 completer.complete(); 182 completer.complete();
179 }, 183 },
180 unsubscribeOnError: unsubscribeOnError); 184 unsubscribeOnError: unsubscribeOnError);
181 return completer.signal; 185 return completer.signal;
182 } 186 }
183 187
184 188
185 /** 189 /**
186 * Check whether [match] occurs in the elements provided by this stream. 190 * Check whether [match] occurs in the elements provided by this stream.
187 * 191 *
188 * Completes the [Future] when the answer is known. 192 * Completes the [Future] when the answer is known.
189 * If this stream reports an error, the [Future] will report that error. 193 * If this stream reports an error, the [Future] will report that error.
190 */ 194 */
191 Future<bool> contains(T match) { 195 Future<bool> contains(T match) {
192 _FutureImpl<bool> future = new _FutureImpl<bool>(); 196 _FutureImpl<bool> future = new _FutureImpl<bool>();
193 StreamSubscription subscription; 197 StreamSubscription subscription;
194 subscription = subscribe( 198 subscription = listen(
Lasse Reichstein Nielsen 2013/01/04 08:17:55 Not your code (probably mine), but ... This uses "
floitsch 2013/01/04 15:51:36 Added this. here at the other use-sites.
195 onData: (T element) { 199 (T element) {
196 if (element == match) { 200 if (element == match) {
197 subscription.unsubscribe(); 201 subscription.cancel();
198 future._setValue(true); 202 future._setValue(true);
199 } 203 }
200 }, 204 },
201 onError: future._setError, 205 onError: future._setError,
202 onDone: () { 206 onDone: () {
203 future._setValue(false); 207 future._setValue(false);
204 }, 208 },
205 unsubscribeOnError: true); 209 unsubscribeOnError: true);
206 return future; 210 return future;
207 } 211 }
208 212
209 /** 213 /**
210 * Check whether [test] accepts all elements provided by this stream. 214 * Check whether [test] accepts all elements provided by this stream.
211 * 215 *
212 * Completes the [Future] when the answer is known. 216 * Completes the [Future] when the answer is known.
213 * If this stream reports an error, the [Future] will report that error. 217 * If this stream reports an error, the [Future] will report that error.
214 */ 218 */
215 Future<bool> every(bool test(T element)) { 219 Future<bool> every(bool test(T element)) {
216 _FutureImpl<bool> future = new _FutureImpl<bool>(); 220 _FutureImpl<bool> future = new _FutureImpl<bool>();
217 StreamSubscription subscription; 221 StreamSubscription subscription;
218 subscription = subscribe( 222 subscription = listen(
219 onData: (T element) { 223 (T element) {
220 if (!test(element)) { 224 if (!test(element)) {
221 subscription.unsubscribe(); 225 subscription.cancel();
222 future._setValue(false); 226 future._setValue(false);
223 } 227 }
224 }, 228 },
225 onError: future._setError, 229 onError: future._setError,
226 onDone: () { 230 onDone: () {
227 future._setValue(true); 231 future._setValue(true);
228 }, 232 },
229 unsubscribeOnError: true); 233 unsubscribeOnError: true);
230 return future; 234 return future;
231 } 235 }
232 236
233 /** 237 /**
234 * Check whether [test] accepts any element provided by this stream. 238 * Check whether [test] accepts any element provided by this stream.
235 * 239 *
236 * Completes the [Future] when the answer is known. 240 * Completes the [Future] when the answer is known.
237 * If this stream reports an error, the [Future] will report that error. 241 * If this stream reports an error, the [Future] will report that error.
238 */ 242 */
239 Future<bool> any(bool test(T element)) { 243 Future<bool> any(bool test(T element)) {
240 _FutureImpl<bool> future = new _FutureImpl<bool>(); 244 _FutureImpl<bool> future = new _FutureImpl<bool>();
241 StreamSubscription subscription; 245 StreamSubscription subscription;
242 subscription = subscribe( 246 subscription = listen(
243 onData: (T element) { 247 (T element) {
244 if (test(element)) { 248 if (test(element)) {
245 subscription.unsubscribe(); 249 subscription.cancel();
246 future._setValue(true); 250 future._setValue(true);
247 } 251 }
248 }, 252 },
249 onError: future._setError, 253 onError: future._setError,
250 onDone: () { 254 onDone: () {
251 future._setValue(false); 255 future._setValue(false);
252 }, 256 },
253 unsubscribeOnError: true); 257 unsubscribeOnError: true);
254 return future; 258 return future;
255 } 259 }
256 260
257 261
258 /** Counts the elements in the stream. */ 262 /** Counts the elements in the stream. */
259 Future<int> get length { 263 Future<int> get length {
260 _FutureImpl<int> future = new _FutureImpl<int>(); 264 _FutureImpl<int> future = new _FutureImpl<int>();
261 int count = 0; 265 int count = 0;
262 subscribe( 266 listen(
263 onData: (_) { count++; }, 267 (_) { count++; },
264 onError: future._setError, 268 onError: future._setError,
265 onDone: () { 269 onDone: () {
266 future._setValue(count); 270 future._setValue(count);
267 }, 271 },
268 unsubscribeOnError: true); 272 unsubscribeOnError: true);
269 return future; 273 return future;
270 } 274 }
271 275
272 /** 276 /**
273 * Finds the least element in the stream. 277 * Finds the least element in the stream.
274 * 278 *
275 * If the stream is empty, the result is [:null:]. 279 * If the stream is empty, the result is [:null:].
276 * Otherwise the result is a value from the stream that is not greater 280 * Otherwise the result is a value from the stream that is not greater
277 * than any other value from the stream (according to [compare], which must 281 * than any other value from the stream (according to [compare], which must
278 * be a [Comparator]). 282 * be a [Comparator]).
279 * 283 *
280 * If [compare] is omitted, it defaults to [Comparable.compare]. 284 * If [compare] is omitted, it defaults to [Comparable.compare].
281 */ 285 */
282 Future<T> min([int compare(T a, T b)]) { 286 Future<T> min([int compare(T a, T b)]) {
283 _FutureImpl<T> future = new _FutureImpl<T>(); 287 _FutureImpl<T> future = new _FutureImpl<T>();
284 StreamSubscription subscription; 288 StreamSubscription subscription;
285 T min = null; 289 T min = null;
286 subscription = subscribe( 290 subscription = listen(
287 onData: (T value) { 291 (T value) {
288 min = value; 292 min = value;
289 subscription.onData = (T value) { 293 subscription.onData = (T value) {
290 if (compare(min, value) > 0) min = value; 294 if (compare(min, value) > 0) min = value;
291 }; 295 };
292 }, 296 },
293 onError: future.setError, 297 onError: future.setError,
294 onDone: () { 298 onDone: () {
295 future._setValue(min); 299 future._setValue(min);
296 }, 300 },
297 unsubscribeOnError: true 301 unsubscribeOnError: true
298 ); 302 );
299 } 303 }
300 304
301 /** 305 /**
302 * Finds the least element in the stream. 306 * Finds the least element in the stream.
303 * 307 *
304 * If the stream is emtpy, the result is [:null:]. 308 * If the stream is emtpy, the result is [:null:].
305 * Otherwise the result is an value from the stream that is not greater 309 * Otherwise the result is an value from the stream that is not greater
306 * than any other value from the stream (according to [compare], which must 310 * than any other value from the stream (according to [compare], which must
307 * be a [Comparator]). 311 * be a [Comparator]).
308 * 312 *
309 * If [compare] is omitted, it defaults to [Comparable.compare]. 313 * If [compare] is omitted, it defaults to [Comparable.compare].
310 */ 314 */
311 Future<T> max([int compare(T a, T b)]) { 315 Future<T> max([int compare(T a, T b)]) {
312 _FutureImpl<T> future = new _FutureImpl<T>(); 316 _FutureImpl<T> future = new _FutureImpl<T>();
313 StreamSubscription subscription; 317 StreamSubscription subscription;
314 T max = null; 318 T max = null;
315 subscription = subscribe( 319 subscription = listen(
316 onData: (T value) { 320 (T value) {
317 max = value; 321 max = value;
318 subscription.onData = (T value) { 322 subscription.onData = (T value) {
319 if (compare(max, value) < 0) max = value; 323 if (compare(max, value) < 0) max = value;
320 }; 324 };
321 }, 325 },
322 onError: future.setError, 326 onError: future.setError,
323 onDone: () { 327 onDone: () {
324 future._setValue(max); 328 future._setValue(max);
325 }, 329 },
326 unsubscribeOnError: true 330 unsubscribeOnError: true
327 ); 331 );
328 } 332 }
329 333
330 /** Reports whether this stream contains any elements. */ 334 /** Reports whether this stream contains any elements. */
331 Future<bool> get isEmpty { 335 Future<bool> get isEmpty {
332 _FutureImpl<bool> future = new _FutureImpl<bool>(); 336 _FutureImpl<bool> future = new _FutureImpl<bool>();
333 StreamSubscription subscription; 337 StreamSubscription subscription;
334 subscription = subscribe( 338 subscription = listen(
335 onData: (_) { 339 (_) {
336 subscription.unsubscribe(); 340 subscription.cancel();
337 future._setValue(false); 341 future._setValue(false);
338 }, 342 },
339 onError: future._setError, 343 onError: future._setError,
340 onDone: () { 344 onDone: () {
341 future._setValue(true); 345 future._setValue(true);
342 }, 346 },
343 unsubscribeOnError: true); 347 unsubscribeOnError: true);
344 return future; 348 return future;
345 } 349 }
346 350
347 /** Collect the data of this stream in a [List]. */ 351 /** Collect the data of this stream in a [List]. */
348 Future<List<T>> toList() { 352 Future<List<T>> toList() {
349 List<T> result = <T>[]; 353 List<T> result = <T>[];
350 _FutureImpl<List<T>> future = new _FutureImpl<List<T>>(); 354 _FutureImpl<List<T>> future = new _FutureImpl<List<T>>();
351 subscribe( 355 listen(
352 onData: (T data) { 356 (T data) {
353 result.add(data); 357 result.add(data);
354 }, 358 },
355 onError: future._setError, 359 onError: future._setError,
356 onDone: () { 360 onDone: () {
357 future._setValue(result); 361 future._setValue(result);
358 }, 362 },
359 unsubscribeOnError: true); 363 unsubscribeOnError: true);
360 return future; 364 return future;
361 } 365 }
362 366
363 /** Collect the data of this stream in a [Set]. */ 367 /** Collect the data of this stream in a [Set]. */
364 Future<Set<T>> toSet() { 368 Future<Set<T>> toSet() {
365 Set<T> result = new Set<T>(); 369 Set<T> result = new Set<T>();
366 _FutureImpl<Set<T>> future = new _FutureImpl<Set<T>>(); 370 _FutureImpl<Set<T>> future = new _FutureImpl<Set<T>>();
367 subscribe( 371 listen(
368 onData: (T data) { 372 (T data) {
369 result.add(data); 373 result.add(data);
370 }, 374 },
371 onError: future._setError, 375 onError: future._setError,
372 onDone: () { 376 onDone: () {
373 future._setValue(result); 377 future._setValue(result);
374 }, 378 },
375 unsubscribeOnError: true); 379 unsubscribeOnError: true);
376 return future; 380 return future;
377 } 381 }
378 382
(...skipping 56 matching lines...) Expand 10 before | Expand all | Expand 10 after
435 439
436 /** 440 /**
437 * Returns the first element. 441 * Returns the first element.
438 * 442 *
439 * If [this] is empty throws a [StateError]. Otherwise this method is 443 * If [this] is empty throws a [StateError]. Otherwise this method is
440 * equivalent to [:this.elementAt(0):] 444 * equivalent to [:this.elementAt(0):]
441 */ 445 */
442 Future<T> get first { 446 Future<T> get first {
443 _FutureImpl<T> future = new _FutureImpl(); 447 _FutureImpl<T> future = new _FutureImpl();
444 StreamSubscription subscription; 448 StreamSubscription subscription;
445 subscription = subscribe( 449 subscription = listen(
446 onData: (T value) { 450 (T value) {
447 future._setValue(value); 451 future._setValue(value);
448 subscription.unsubscribe(); 452 subscription.cancel();
449 return; 453 return;
450 }, 454 },
451 onError: future._setError, 455 onError: future._setError,
452 onDone: () { 456 onDone: () {
453 future._setError(new AsyncError(new StateError("No elements"))); 457 future._setError(new AsyncError(new StateError("No elements")));
454 }, 458 },
455 unsubscribeOnError: true); 459 unsubscribeOnError: true);
456 return future; 460 return future;
457 } 461 }
458 462
459 /** 463 /**
460 * Returns the last element. 464 * Returns the last element.
461 * 465 *
462 * If [this] is empty throws a [StateError]. 466 * If [this] is empty throws a [StateError].
463 */ 467 */
464 Future<T> get last { 468 Future<T> get last {
465 _FutureImpl<T> future = new _FutureImpl<T>(); 469 _FutureImpl<T> future = new _FutureImpl<T>();
466 T result = null; 470 T result = null;
467 bool foundResult = false; 471 bool foundResult = false;
468 StreamSubscription subscription; 472 StreamSubscription subscription;
469 subscription = subscribe( 473 subscription = listen(
470 onData: (T value) { 474 (T value) {
471 foundResult = true; 475 foundResult = true;
472 result = value; 476 result = value;
473 }, 477 },
474 onError: future._setError, 478 onError: future._setError,
475 onDone: () { 479 onDone: () {
476 if (foundResult) { 480 if (foundResult) {
477 future._setValue(result); 481 future._setValue(result);
478 return; 482 return;
479 } 483 }
480 future._setError(new AsyncError(new StateError("No elements"))); 484 future._setError(new AsyncError(new StateError("No elements")));
481 }, 485 },
482 unsubscribeOnError: true); 486 unsubscribeOnError: true);
483 return future; 487 return future;
484 } 488 }
485 489
486 /** 490 /**
487 * Returns the single element. 491 * Returns the single element.
488 * 492 *
489 * If [this] is empty or has more than one element throws a [StateError]. 493 * If [this] is empty or has more than one element throws a [StateError].
490 */ 494 */
491 Future<T> get single { 495 Future<T> get single {
492 _FutureImpl<T> future = new _FutureImpl<T>(); 496 _FutureImpl<T> future = new _FutureImpl<T>();
493 T result = null; 497 T result = null;
494 bool foundResult = false; 498 bool foundResult = false;
495 StreamSubscription subscription; 499 StreamSubscription subscription;
496 subscription = subscribe( 500 subscription = listen(
497 onData: (T value) { 501 (T value) {
498 if (foundResult) { 502 if (foundResult) {
499 // This is the second element we get. 503 // This is the second element we get.
500 Error error = new StateError("More than one element"); 504 Error error = new StateError("More than one element");
501 future._setError(new AsyncError(error)); 505 future._setError(new AsyncError(error));
502 subscription.unsubscribe(); 506 subscription.cancel();
503 return; 507 return;
504 } 508 }
505 foundResult = true; 509 foundResult = true;
506 result = value; 510 result = value;
507 }, 511 },
508 onError: future._setError, 512 onError: future._setError,
509 onDone: () { 513 onDone: () {
510 if (foundResult) { 514 if (foundResult) {
511 future._setValue(result); 515 future._setValue(result);
512 return; 516 return;
(...skipping 14 matching lines...) Expand all
527 * [defaultValue] function is provided, the result of calling [defaultValue] 531 * [defaultValue] function is provided, the result of calling [defaultValue]
528 * becomes the value of the future. 532 * becomes the value of the future.
529 * 533 *
530 * If an error occurs, or if this stream ends without finding a match and 534 * If an error occurs, or if this stream ends without finding a match and
531 * with no [defaultValue] function provided, the future will receive an 535 * with no [defaultValue] function provided, the future will receive an
532 * error. 536 * error.
533 */ 537 */
534 Future<T> firstMatching(bool test(T value), {T defaultValue()}) { 538 Future<T> firstMatching(bool test(T value), {T defaultValue()}) {
535 _FutureImpl<T> future = new _FutureImpl<T>(); 539 _FutureImpl<T> future = new _FutureImpl<T>();
536 StreamSubscription subscription; 540 StreamSubscription subscription;
537 subscription = subscribe( 541 subscription = listen(
538 onData: (T value) { 542 (T value) {
539 bool matches; 543 bool matches;
540 try { 544 try {
541 matches = (true == test(value)); 545 matches = (true == test(value));
542 } catch (e, s) { 546 } catch (e, s) {
543 future._setError(new AsyncError(e, s)); 547 future._setError(new AsyncError(e, s));
544 subscription.unsubscribe(); 548 subscription.cancel();
545 return; 549 return;
546 } 550 }
547 if (matches) { 551 if (matches) {
548 future._setValue(value); 552 future._setValue(value);
549 subscription.unsubscribe(); 553 subscription.cancel();
550 } 554 }
551 }, 555 },
552 onError: future._setError, 556 onError: future._setError,
553 onDone: () { 557 onDone: () {
554 if (defaultValue != null) { 558 if (defaultValue != null) {
555 T value; 559 T value;
556 try { 560 try {
557 value = defaultValue(); 561 value = defaultValue();
558 } catch (e, s) { 562 } catch (e, s) {
559 future._setError(new AsyncError(e, s)); 563 future._setError(new AsyncError(e, s));
(...skipping 14 matching lines...) Expand all
574 * 578 *
575 * As [firstMatching], except that the last matching element is found. 579 * As [firstMatching], except that the last matching element is found.
576 * That means that the result cannot be provided before this stream 580 * That means that the result cannot be provided before this stream
577 * is done. 581 * is done.
578 */ 582 */
579 Future<T> lastMatching(bool test(T value), {T defaultValue()}) { 583 Future<T> lastMatching(bool test(T value), {T defaultValue()}) {
580 _FutureImpl<T> future = new _FutureImpl<T>(); 584 _FutureImpl<T> future = new _FutureImpl<T>();
581 T result = null; 585 T result = null;
582 bool foundResult = false; 586 bool foundResult = false;
583 StreamSubscription subscription; 587 StreamSubscription subscription;
584 subscription = subscribe( 588 subscription = listen(
585 onData: (T value) { 589 (T value) {
586 bool matches; 590 bool matches;
587 try { 591 try {
588 matches = (true == test(value)); 592 matches = (true == test(value));
589 } catch (e, s) { 593 } catch (e, s) {
590 future._setError(new AsyncError(e, s)); 594 future._setError(new AsyncError(e, s));
591 subscription.unsubscribe(); 595 subscription.cancel();
592 return; 596 return;
593 } 597 }
594 if (matches) { 598 if (matches) {
595 foundResult = true; 599 foundResult = true;
596 result = value; 600 result = value;
597 } 601 }
598 }, 602 },
599 onError: future._setError, 603 onError: future._setError,
600 onDone: () { 604 onDone: () {
601 if (foundResult) { 605 if (foundResult) {
(...skipping 22 matching lines...) Expand all
624 * Finds the single element in this stream matching [test]. 628 * Finds the single element in this stream matching [test].
625 * 629 *
626 * Like [lastMatch], except that it is an error if more than one 630 * Like [lastMatch], except that it is an error if more than one
627 * matching element occurs in the stream. 631 * matching element occurs in the stream.
628 */ 632 */
629 Future<T> singleMatching(bool test(T value)) { 633 Future<T> singleMatching(bool test(T value)) {
630 _FutureImpl<T> future = new _FutureImpl<T>(); 634 _FutureImpl<T> future = new _FutureImpl<T>();
631 T result = null; 635 T result = null;
632 bool foundResult = false; 636 bool foundResult = false;
633 StreamSubscription subscription; 637 StreamSubscription subscription;
634 subscription = subscribe( 638 subscription = listen(
635 onData: (T value) { 639 (T value) {
636 bool matches; 640 bool matches;
637 try { 641 try {
638 matches = (true == test(value)); 642 matches = (true == test(value));
639 } catch (e, s) { 643 } catch (e, s) {
640 future._setError(new AsyncError(e, s)); 644 future._setError(new AsyncError(e, s));
641 subscription.unsubscribe(); 645 subscription.cancel();
642 return; 646 return;
643 } 647 }
644 if (matches) { 648 if (matches) {
645 if (foundResult) { 649 if (foundResult) {
646 future._setError(new AsyncError( 650 future._setError(new AsyncError(
647 new StateError('Multiple matches for "single"'))); 651 new StateError('Multiple matches for "single"')));
648 subscription.unsubscribe(); 652 subscription.cancel();
649 return; 653 return;
650 } 654 }
651 foundResult = true; 655 foundResult = true;
652 result = value; 656 result = value;
653 } 657 }
654 }, 658 },
655 onError: future._setError, 659 onError: future._setError,
656 onDone: () { 660 onDone: () {
657 if (foundResult) { 661 if (foundResult) {
658 future._setValue(result); 662 future._setValue(result);
(...skipping 10 matching lines...) Expand all
669 * Returns the value of the [index]th data event of this stream. 673 * Returns the value of the [index]th data event of this stream.
670 * 674 *
671 * If an error event occurs, the future will end with this error. 675 * If an error event occurs, the future will end with this error.
672 * 676 *
673 * If this stream provides fewer than [index] elements before closing, 677 * If this stream provides fewer than [index] elements before closing,
674 * an error is reported. 678 * an error is reported.
675 */ 679 */
676 Future<T> elementAt(int index) { 680 Future<T> elementAt(int index) {
677 _FutureImpl<T> future = new _FutureImpl(); 681 _FutureImpl<T> future = new _FutureImpl();
678 StreamSubscription subscription; 682 StreamSubscription subscription;
679 subscription = subscribe( 683 subscription = listen(
680 onData: (T value) { 684 (T value) {
681 if (index == 0) { 685 if (index == 0) {
682 future._setValue(value); 686 future._setValue(value);
683 subscription.unsubscribe(); 687 subscription.cancel();
684 return; 688 return;
685 } 689 }
686 index -= 1; 690 index -= 1;
687 }, 691 },
688 onError: future._setError, 692 onError: future._setError,
689 onDone: () { 693 onDone: () {
690 future._setError(new AsyncError( 694 future._setError(new AsyncError(
691 new StateError("Not enough elements for elementAt"))); 695 new StateError("Not enough elements for elementAt")));
692 }, 696 },
693 unsubscribeOnError: true); 697 unsubscribeOnError: true);
694 return future; 698 return future;
695 } 699 }
696 } 700 }
697 701
698 /** 702 /**
699 * A control object for the subscription on a [Stream]. 703 * A control object for the subscription on a [Stream].
700 * 704 *
701 * When you subscribe on a [Stream] using [Stream.subscribe], 705 * When you subscribe on a [Stream] using [Stream.subscribe],
702 * a [StreamSubscription] object is returned. This object 706 * a [StreamSubscription] object is returned. This object
703 * is used to later unsubscribe again, or to temporarily pause 707 * is used to later unsubscribe again, or to temporarily pause
704 * the stream's events. 708 * the stream's events.
705 */ 709 */
706 abstract class StreamSubscription<T> { 710 abstract class StreamSubscription<T> {
707 /** 711 /**
708 * Cancels this subscription. It will no longer receive events. 712 * Cancels this subscription. It will no longer receive events.
709 * 713 *
710 * If an event is currently firing, this unsubscription will only 714 * If an event is currently firing, this unsubscription will only
711 * take effect after all subscribers have received the current event. 715 * take effect after all subscribers have received the current event.
712 */ 716 */
713 void unsubscribe(); 717 void cancel();
714 718
715 /** Set or override the data event handler of this subscription. */ 719 /** Set or override the data event handler of this subscription. */
716 void onData(void handleData(T data)); 720 void onData(void handleData(T data));
717 721
718 /** Set or override the error event handler of this subscription. */ 722 /** Set or override the error event handler of this subscription. */
719 void onError(void handleError(AsyncError error)); 723 void onError(void handleError(AsyncError error));
720 724
721 /** Set or override the done event handler of this subscription. */ 725 /** Set or override the done event handler of this subscription. */
722 void onDone(void handleDone()); 726 void onDone(void handleDone());
723 727
(...skipping 25 matching lines...) Expand all
749 void signalError(AsyncError errorEvent); 753 void signalError(AsyncError errorEvent);
750 void close(); 754 void close();
751 } 755 }
752 756
753 /** [Stream] wrapper that only exposes the [Stream] interface. */ 757 /** [Stream] wrapper that only exposes the [Stream] interface. */
754 class StreamView<T> extends Stream<T> { 758 class StreamView<T> extends Stream<T> {
755 Stream<T> _stream; 759 Stream<T> _stream;
756 760
757 StreamView(this._stream); 761 StreamView(this._stream);
758 762
759 StreamSubscription<T> subscribe({void onData(T value), 763 StreamSubscription<T> listen(void onData(T value),
760 void onError(AsyncError error), 764 { void onError(AsyncError error),
761 void onDone(), 765 void onDone(),
762 bool unsubscribeOnError}) { 766 bool unsubscribeOnError}) {
763 return _stream.subscribe(onData: onData, onError: onError, onDone: onDone, 767 return _stream.listen(onData, onError: onError, onDone: onDone,
764 unsubscribeOnError: unsubscribeOnError); 768 unsubscribeOnError: unsubscribeOnError);
765 } 769 }
766 } 770 }
767 771
768 /** 772 /**
769 * [StreamSink] wrapper that only exposes the [StreamSink] interface. 773 * [StreamSink] wrapper that only exposes the [StreamSink] interface.
770 */ 774 */
771 class StreamSinkView<T> implements StreamSink<T> { 775 class StreamSinkView<T> implements StreamSink<T> {
772 final StreamSink<T> _sink; 776 final StreamSink<T> _sink;
773 777
(...skipping 67 matching lines...) Expand 10 before | Expand all | Expand 10 after
841 sink.signalError(error); 845 sink.signalError(error);
842 } 846 }
843 847
844 /** 848 /**
845 * Handle an incoming done event. 849 * Handle an incoming done event.
846 */ 850 */
847 void handleDone(StreamSink<T> sink) { 851 void handleDone(StreamSink<T> sink) {
848 sink.close(); 852 sink.close();
849 } 853 }
850 } 854 }
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698