Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(178)

Side by Side Diff: lib/src/stream_queue.dart

Issue 1841223002: Fix most strong mode warnings. (Closed) Base URL: git@github.com:dart-lang/async.git@master
Patch Set: Code review changes Created 4 years, 8 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
« no previous file with comments | « lib/src/stream_group.dart ('k') | lib/src/stream_sink_completer.dart » ('j') | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
1 // Copyright (c) 2015, the Dart project authors. Please see the AUTHORS file 1 // Copyright (c) 2015, the Dart project authors. Please see the AUTHORS file
2 // for details. All rights reserved. Use of this source code is governed by a 2 // for details. All rights reserved. Use of this source code is governed by a
3 // BSD-style license that can be found in the LICENSE file. 3 // BSD-style license that can be found in the LICENSE file.
4 4
5 import 'dart:async'; 5 import 'dart:async';
6 import 'dart:collection'; 6 import 'dart:collection';
7 7
8 import "result.dart"; 8 import "result.dart";
9 import "subscription_stream.dart"; 9 import "subscription_stream.dart";
10 import "stream_completer.dart"; 10 import "stream_completer.dart";
(...skipping 76 matching lines...) Expand 10 before | Expand all | Expand 10 after
87 87
88 /// Queue of events not used by a request yet. 88 /// Queue of events not used by a request yet.
89 final Queue<Result> _eventQueue = new Queue(); 89 final Queue<Result> _eventQueue = new Queue();
90 90
91 /// Queue of pending requests. 91 /// Queue of pending requests.
92 /// 92 ///
93 /// Access through methods below to ensure consistency. 93 /// Access through methods below to ensure consistency.
94 final Queue<_EventRequest> _requestQueue = new Queue(); 94 final Queue<_EventRequest> _requestQueue = new Queue();
95 95
96 /// Create a `StreamQueue` of the events of [source]. 96 /// Create a `StreamQueue` of the events of [source].
97 factory StreamQueue(Stream source) = _StreamQueue<T>; 97 factory StreamQueue(Stream<T> source) = _StreamQueue<T>;
98 98
99 StreamQueue._(); 99 StreamQueue._();
100 100
101 /// Asks if the stream has any more events. 101 /// Asks if the stream has any more events.
102 /// 102 ///
103 /// Returns a future that completes with `true` if the stream has any 103 /// Returns a future that completes with `true` if the stream has any
104 /// more events, whether data or error. 104 /// more events, whether data or error.
105 /// If the stream closes without producing any more events, the returned 105 /// If the stream closes without producing any more events, the returned
106 /// future completes with `false`. 106 /// future completes with `false`.
107 /// 107 ///
(...skipping 159 matching lines...) Expand 10 before | Expand all | Expand 10 after
267 _pause(); 267 _pause();
268 } 268 }
269 } 269 }
270 270
271 /// Extracts a stream from the event source and makes this stream queue 271 /// Extracts a stream from the event source and makes this stream queue
272 /// unusable. 272 /// unusable.
273 /// 273 ///
274 /// Can only be used by the very last request (the stream queue must 274 /// Can only be used by the very last request (the stream queue must
275 /// be closed by that request). 275 /// be closed by that request).
276 /// Only used by [rest]. 276 /// Only used by [rest].
277 Stream _extractStream(); 277 Stream<T> _extractStream();
278 278
279 /// Requests that the event source pauses events. 279 /// Requests that the event source pauses events.
280 /// 280 ///
281 /// This is called automatically when the request queue is empty. 281 /// This is called automatically when the request queue is empty.
282 /// 282 ///
283 /// The event source is restarted by the next call to [_ensureListening]. 283 /// The event source is restarted by the next call to [_ensureListening].
284 void _pause(); 284 void _pause();
285 285
286 /// Ensures that we are listening on events from the event source. 286 /// Ensures that we are listening on events from the event source.
287 /// 287 ///
(...skipping 47 matching lines...) Expand 10 before | Expand all | Expand 10 after
335 } 335 }
336 } 336 }
337 337
338 338
339 /// The default implementation of [StreamQueue]. 339 /// The default implementation of [StreamQueue].
340 /// 340 ///
341 /// This queue gets its events from a stream which is listened 341 /// This queue gets its events from a stream which is listened
342 /// to when a request needs events. 342 /// to when a request needs events.
343 class _StreamQueue<T> extends StreamQueue<T> { 343 class _StreamQueue<T> extends StreamQueue<T> {
344 /// Source of events. 344 /// Source of events.
345 final Stream _sourceStream; 345 final Stream<T> _sourceStream;
346 346
347 /// Subscription on [_sourceStream] while listening for events. 347 /// Subscription on [_sourceStream] while listening for events.
348 /// 348 ///
349 /// Set to subscription when listening, and set to `null` when the 349 /// Set to subscription when listening, and set to `null` when the
350 /// subscription is done (and [_isDone] is set to true). 350 /// subscription is done (and [_isDone] is set to true).
351 StreamSubscription _subscription; 351 StreamSubscription<T> _subscription;
352 352
353 _StreamQueue(this._sourceStream) : super._(); 353 _StreamQueue(this._sourceStream) : super._();
354 354
355 Future _cancel() { 355 Future _cancel() {
356 if (_isDone) return null; 356 if (_isDone) return null;
357 if (_subscription == null) _subscription = _sourceStream.listen(null); 357 if (_subscription == null) _subscription = _sourceStream.listen(null);
358 var future = _subscription.cancel(); 358 var future = _subscription.cancel();
359 _close(); 359 _close();
360 return future; 360 return future;
361 } 361 }
(...skipping 53 matching lines...) Expand 10 before | Expand all | Expand 10 after
415 /// 415 ///
416 /// Events from the source stream are sent to the first request in the 416 /// Events from the source stream are sent to the first request in the
417 /// queue until it reports itself as [isComplete]. 417 /// queue until it reports itself as [isComplete].
418 /// 418 ///
419 /// When the first request in the queue `isComplete`, either when becoming 419 /// When the first request in the queue `isComplete`, either when becoming
420 /// the first request or after receiving an event, its [close] methods is 420 /// the first request or after receiving an event, its [close] methods is
421 /// called. 421 /// called.
422 /// 422 ///
423 /// The [close] method is also called immediately when the source stream 423 /// The [close] method is also called immediately when the source stream
424 /// is done. 424 /// is done.
425 abstract class _EventRequest { 425 abstract class _EventRequest<T> {
426 /// Handle available events. 426 /// Handle available events.
427 /// 427 ///
428 /// The available events are provided as a queue. The `update` function 428 /// The available events are provided as a queue. The `update` function
429 /// should only remove events from the front of the event queue, e.g., 429 /// should only remove events from the front of the event queue, e.g.,
430 /// using [removeFirst]. 430 /// using [removeFirst].
431 /// 431 ///
432 /// Returns `true` if the request is completed, or `false` if it needs 432 /// Returns `true` if the request is completed, or `false` if it needs
433 /// more events. 433 /// more events.
434 /// The call may keep events in the queue until the requeust is complete, 434 /// The call may keep events in the queue until the requeust is complete,
435 /// or it may remove them immediately. 435 /// or it may remove them immediately.
436 /// 436 ///
437 /// If the method returns true, the request is considered fulfilled, and 437 /// If the method returns true, the request is considered fulfilled, and
438 /// will never be called again. 438 /// will never be called again.
439 /// 439 ///
440 /// This method is called when a request reaches the front of the request 440 /// This method is called when a request reaches the front of the request
441 /// queue, and if it returns `false`, it's called again every time a new event 441 /// queue, and if it returns `false`, it's called again every time a new event
442 /// becomes available, or when the stream closes. 442 /// becomes available, or when the stream closes.
443 /// If the function returns `false` when the stream has already closed 443 /// If the function returns `false` when the stream has already closed
444 /// ([isDone] is true), then the request must call 444 /// ([isDone] is true), then the request must call
445 /// [StreamQueue._updateRequests] itself when it's ready to continue. 445 /// [StreamQueue._updateRequests] itself when it's ready to continue.
446 bool update(Queue<Result> events, bool isDone); 446 bool update(Queue<Result<T>> events, bool isDone);
447 } 447 }
448 448
449 /// Request for a [StreamQueue.next] call. 449 /// Request for a [StreamQueue.next] call.
450 /// 450 ///
451 /// Completes the returned future when receiving the first event, 451 /// Completes the returned future when receiving the first event,
452 /// and is then complete. 452 /// and is then complete.
453 class _NextRequest<T> implements _EventRequest { 453 class _NextRequest<T> implements _EventRequest<T> {
454 /// Completer for the future returned by [StreamQueue.next]. 454 /// Completer for the future returned by [StreamQueue.next].
455 final Completer _completer; 455 final _completer = new Completer<T>();
456 456
457 _NextRequest() : _completer = new Completer<T>(); 457 _NextRequest();
458 458
459 Future<T> get future => _completer.future; 459 Future<T> get future => _completer.future;
460 460
461 bool update(Queue<Result> events, bool isDone) { 461 bool update(Queue<Result<T>> events, bool isDone) {
462 if (events.isNotEmpty) { 462 if (events.isNotEmpty) {
463 events.removeFirst().complete(_completer); 463 events.removeFirst().complete(_completer);
464 return true; 464 return true;
465 } 465 }
466 if (isDone) { 466 if (isDone) {
467 var errorFuture = 467 var errorFuture =
468 new Future.sync(() => throw new StateError("No elements")); 468 new Future.sync(() => throw new StateError("No elements"));
469 _completer.complete(errorFuture); 469 _completer.complete(errorFuture);
470 return true; 470 return true;
471 } 471 }
472 return false; 472 return false;
473 } 473 }
474 } 474 }
475 475
476 /// Request for a [StreamQueue.skip] call. 476 /// Request for a [StreamQueue.skip] call.
477 class _SkipRequest implements _EventRequest { 477 class _SkipRequest<T> implements _EventRequest<T> {
478 /// Completer for the future returned by the skip call. 478 /// Completer for the future returned by the skip call.
479 final Completer _completer = new Completer<int>(); 479 final _completer = new Completer<int>();
480 480
481 /// Number of remaining events to skip. 481 /// Number of remaining events to skip.
482 /// 482 ///
483 /// The request [isComplete] when the values reaches zero. 483 /// The request [isComplete] when the values reaches zero.
484 /// 484 ///
485 /// Decremented when an event is seen. 485 /// Decremented when an event is seen.
486 /// Set to zero when an error is seen since errors abort the skip request. 486 /// Set to zero when an error is seen since errors abort the skip request.
487 int _eventsToSkip; 487 int _eventsToSkip;
488 488
489 _SkipRequest(this._eventsToSkip); 489 _SkipRequest(this._eventsToSkip);
490 490
491 /// The future completed when the correct number of events have been skipped. 491 /// The future completed when the correct number of events have been skipped.
492 Future get future => _completer.future; 492 Future<int> get future => _completer.future;
493 493
494 bool update(Queue<Result> events, bool isDone) { 494 bool update(Queue<Result<T>> events, bool isDone) {
495 while (_eventsToSkip > 0) { 495 while (_eventsToSkip > 0) {
496 if (events.isEmpty) { 496 if (events.isEmpty) {
497 if (isDone) break; 497 if (isDone) break;
498 return false; 498 return false;
499 } 499 }
500 _eventsToSkip--; 500 _eventsToSkip--;
501 501
502 var event = events.removeFirst(); 502 var event = events.removeFirst();
503 if (event.isError) { 503 if (event.isError) {
504 event.complete(_completer); 504 _completer.completeError(event.asError.error, event.asError.stackTrace);
505 return true; 505 return true;
506 } 506 }
507 } 507 }
508 _completer.complete(_eventsToSkip); 508 _completer.complete(_eventsToSkip);
509 return true; 509 return true;
510 } 510 }
511 } 511 }
512 512
513 /// Request for a [StreamQueue.take] call. 513 /// Request for a [StreamQueue.take] call.
514 class _TakeRequest<T> implements _EventRequest { 514 class _TakeRequest<T> implements _EventRequest<T> {
515 /// Completer for the future returned by the take call. 515 /// Completer for the future returned by the take call.
516 final Completer _completer; 516 final _completer = new Completer<List<T>>();
517 517
518 /// List collecting events until enough have been seen. 518 /// List collecting events until enough have been seen.
519 final List _list = <T>[]; 519 final _list = <T>[];
520 520
521 /// Number of events to capture. 521 /// Number of events to capture.
522 /// 522 ///
523 /// The request [isComplete] when the length of [_list] reaches 523 /// The request [isComplete] when the length of [_list] reaches
524 /// this value. 524 /// this value.
525 final int _eventsToTake; 525 final int _eventsToTake;
526 526
527 _TakeRequest(this._eventsToTake) : _completer = new Completer<List<T>>(); 527 _TakeRequest(this._eventsToTake);
528 528
529 /// The future completed when the correct number of events have been captured. 529 /// The future completed when the correct number of events have been captured.
530 Future get future => _completer.future; 530 Future<List<T>> get future => _completer.future;
531 531
532 bool update(Queue<Result> events, bool isDone) { 532 bool update(Queue<Result<T>> events, bool isDone) {
533 while (_list.length < _eventsToTake) { 533 while (_list.length < _eventsToTake) {
534 if (events.isEmpty) { 534 if (events.isEmpty) {
535 if (isDone) break; 535 if (isDone) break;
536 return false; 536 return false;
537 } 537 }
538 538
539 var result = events.removeFirst(); 539 var event = events.removeFirst();
540 if (result.isError) { 540 if (event.isError) {
541 result.complete(_completer); 541 _completer.completeError(event.asError.error, event.asError.stackTrace);
542 return true; 542 return true;
543 } 543 }
544 _list.add(result.asValue.value); 544 _list.add(event.asValue.value);
545 } 545 }
546 _completer.complete(_list); 546 _completer.complete(_list);
547 return true; 547 return true;
548 } 548 }
549 } 549 }
550 550
551 /// Request for a [StreamQueue.cancel] call. 551 /// Request for a [StreamQueue.cancel] call.
552 /// 552 ///
553 /// The request needs no events, it just waits in the request queue 553 /// The request needs no events, it just waits in the request queue
554 /// until all previous events are fulfilled, then it cancels the stream queue 554 /// until all previous events are fulfilled, then it cancels the stream queue
555 /// source subscription. 555 /// source subscription.
556 class _CancelRequest implements _EventRequest { 556 class _CancelRequest<T> implements _EventRequest<T> {
557 /// Completer for the future returned by the `cancel` call. 557 /// Completer for the future returned by the `cancel` call.
558 final Completer _completer = new Completer(); 558 final _completer = new Completer();
559 559
560 /// The [StreamQueue] object that has this request queued. 560 /// The [StreamQueue] object that has this request queued.
561 /// 561 ///
562 /// When the event is completed, it needs to cancel the active subscription 562 /// When the event is completed, it needs to cancel the active subscription
563 /// of the `StreamQueue` object, if any. 563 /// of the `StreamQueue` object, if any.
564 final StreamQueue _streamQueue; 564 final StreamQueue _streamQueue;
565 565
566 _CancelRequest(this._streamQueue); 566 _CancelRequest(this._streamQueue);
567 567
568 /// The future completed when the cancel request is completed. 568 /// The future completed when the cancel request is completed.
569 Future get future => _completer.future; 569 Future get future => _completer.future;
570 570
571 bool update(Queue<Result> events, bool isDone) { 571 bool update(Queue<Result<T>> events, bool isDone) {
572 if (_streamQueue._isDone) { 572 if (_streamQueue._isDone) {
573 _completer.complete(); 573 _completer.complete();
574 } else { 574 } else {
575 _streamQueue._ensureListening(); 575 _streamQueue._ensureListening();
576 _completer.complete(_streamQueue._extractStream().listen(null).cancel()); 576 _completer.complete(_streamQueue._extractStream().listen(null).cancel());
577 } 577 }
578 return true; 578 return true;
579 } 579 }
580 } 580 }
581 581
582 /// Request for a [StreamQueue.rest] call. 582 /// Request for a [StreamQueue.rest] call.
583 /// 583 ///
584 /// The request is always complete, it just waits in the request queue 584 /// The request is always complete, it just waits in the request queue
585 /// until all previous events are fulfilled, then it takes over the 585 /// until all previous events are fulfilled, then it takes over the
586 /// stream events subscription and creates a stream from it. 586 /// stream events subscription and creates a stream from it.
587 class _RestRequest<T> implements _EventRequest { 587 class _RestRequest<T> implements _EventRequest<T> {
588 /// Completer for the stream returned by the `rest` call. 588 /// Completer for the stream returned by the `rest` call.
589 final StreamCompleter _completer = new StreamCompleter<T>(); 589 final _completer = new StreamCompleter<T>();
590 590
591 /// The [StreamQueue] object that has this request queued. 591 /// The [StreamQueue] object that has this request queued.
592 /// 592 ///
593 /// When the event is completed, it needs to cancel the active subscription 593 /// When the event is completed, it needs to cancel the active subscription
594 /// of the `StreamQueue` object, if any. 594 /// of the `StreamQueue` object, if any.
595 final StreamQueue _streamQueue; 595 final StreamQueue<T> _streamQueue;
596 596
597 _RestRequest(this._streamQueue); 597 _RestRequest(this._streamQueue);
598 598
599 /// The stream which will contain the remaining events of [_streamQueue]. 599 /// The stream which will contain the remaining events of [_streamQueue].
600 Stream<T> get stream => _completer.stream; 600 Stream<T> get stream => _completer.stream;
601 601
602 bool update(Queue<Result> events, bool isDone) { 602 bool update(Queue<Result<T>> events, bool isDone) {
603 if (events.isEmpty) { 603 if (events.isEmpty) {
604 if (_streamQueue._isDone) { 604 if (_streamQueue._isDone) {
605 _completer.setEmpty(); 605 _completer.setEmpty();
606 } else { 606 } else {
607 _completer.setSourceStream(_streamQueue._extractStream()); 607 _completer.setSourceStream(_streamQueue._extractStream());
608 } 608 }
609 } else { 609 } else {
610 // There are prefetched events which needs to be added before the 610 // There are prefetched events which needs to be added before the
611 // remaining stream. 611 // remaining stream.
612 var controller = new StreamController<T>(); 612 var controller = new StreamController<T>();
613 for (var event in events) { 613 for (var event in events) {
614 event.addTo(controller); 614 event.addTo(controller);
615 } 615 }
616 controller.addStream(_streamQueue._extractStream(), cancelOnError: false) 616 controller.addStream(_streamQueue._extractStream(), cancelOnError: false)
617 .whenComplete(controller.close); 617 .whenComplete(controller.close);
618 _completer.setSourceStream(controller.stream); 618 _completer.setSourceStream(controller.stream);
619 } 619 }
620 return true; 620 return true;
621 } 621 }
622 } 622 }
623 623
624 /// Request for a [StreamQueue.hasNext] call. 624 /// Request for a [StreamQueue.hasNext] call.
625 /// 625 ///
626 /// Completes the [future] with `true` if it sees any event, 626 /// Completes the [future] with `true` if it sees any event,
627 /// but doesn't consume the event. 627 /// but doesn't consume the event.
628 /// If the request is closed without seeing an event, then 628 /// If the request is closed without seeing an event, then
629 /// the [future] is completed with `false`. 629 /// the [future] is completed with `false`.
630 class _HasNextRequest<T> implements _EventRequest { 630 class _HasNextRequest<T> implements _EventRequest<T> {
631 final Completer _completer = new Completer<bool>(); 631 final _completer = new Completer<bool>();
632 632
633 Future<bool> get future => _completer.future; 633 Future<bool> get future => _completer.future;
634 634
635 bool update(Queue<Result> events, bool isDone) { 635 bool update(Queue<Result<T>> events, bool isDone) {
636 if (events.isNotEmpty) { 636 if (events.isNotEmpty) {
637 _completer.complete(true); 637 _completer.complete(true);
638 return true; 638 return true;
639 } 639 }
640 if (isDone) { 640 if (isDone) {
641 _completer.complete(false); 641 _completer.complete(false);
642 return true; 642 return true;
643 } 643 }
644 return false; 644 return false;
645 } 645 }
646 } 646 }
OLDNEW
« no previous file with comments | « lib/src/stream_group.dart ('k') | lib/src/stream_sink_completer.dart » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698