Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(150)

Side by Side Diff: lib/src/util/stream_queue.dart

Issue 1960503002: Fix all strong-mode errors and warnings. (Closed) Base URL: git@github.com:dart-lang/test@master
Patch Set: .analysis_options Created 4 years, 7 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
« no previous file with comments | « lib/src/util/io.dart ('k') | lib/src/utils.dart » ('j') | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
1 // Copyright (c) 2015, the Dart project authors. Please see the AUTHORS file 1 // Copyright (c) 2015, the Dart project authors. Please see the AUTHORS file
2 // for details. All rights reserved. Use of this source code is governed by a 2 // for details. All rights reserved. Use of this source code is governed by a
3 // BSD-style license that can be found in the LICENSE file. 3 // BSD-style license that can be found in the LICENSE file.
4 4
5 // TODO(nweiz): Get rid of this when https://codereview.chromium.org/1241723003/ 5 // TODO(nweiz): Get rid of this when https://codereview.chromium.org/1241723003/
6 // lands. 6 // lands.
7 import 'dart:async'; 7 import 'dart:async';
8 import 'dart:collection'; 8 import 'dart:collection';
9 9
10 import "package:async/async.dart" hide ForkableStream, StreamQueue; 10 import "package:async/async.dart" hide ForkableStream, StreamQueue;
(...skipping 60 matching lines...) Expand 10 before | Expand all | Expand 10 after
71 // again when new events are available. 71 // again when new events are available.
72 // The request can remove events that it uses, or keep them in the event 72 // The request can remove events that it uses, or keep them in the event
73 // queue until it has all that it needs. 73 // queue until it has all that it needs.
74 // 74 //
75 // This model is very flexible and easily extensible. 75 // This model is very flexible and easily extensible.
76 // It allows requests that don't consume events (like [hasNext]) or 76 // It allows requests that don't consume events (like [hasNext]) or
77 // potentially a request that takes either five or zero events, determined 77 // potentially a request that takes either five or zero events, determined
78 // by the content of the fifth event. 78 // by the content of the fifth event.
79 79
80 /// Source of events. 80 /// Source of events.
81 final ForkableStream _sourceStream; 81 final ForkableStream<T> _sourceStream;
82 82
83 /// Subscription on [_sourceStream] while listening for events. 83 /// Subscription on [_sourceStream] while listening for events.
84 /// 84 ///
85 /// Set to subscription when listening, and set to `null` when the 85 /// Set to subscription when listening, and set to `null` when the
86 /// subscription is done (and [_isDone] is set to true). 86 /// subscription is done (and [_isDone] is set to true).
87 StreamSubscription _subscription; 87 StreamSubscription<T> _subscription;
88 88
89 /// Whether we have listened on [_sourceStream] and the subscription is done. 89 /// Whether we have listened on [_sourceStream] and the subscription is done.
90 bool _isDone = false; 90 bool _isDone = false;
91 91
92 /// Whether a closing operation has been performed on the stream queue. 92 /// Whether a closing operation has been performed on the stream queue.
93 /// 93 ///
94 /// Closing operations are [cancel] and [rest]. 94 /// Closing operations are [cancel] and [rest].
95 bool _isClosed = false; 95 bool _isClosed = false;
96 96
97 /// Queue of events not used by a request yet. 97 /// Queue of events not used by a request yet.
98 final Queue<Result> _eventQueue = new Queue(); 98 final Queue<Result> _eventQueue = new Queue();
99 99
100 /// Queue of pending requests. 100 /// Queue of pending requests.
101 /// 101 ///
102 /// Access through methods below to ensure consistency. 102 /// Access through methods below to ensure consistency.
103 final Queue<_EventRequest> _requestQueue = new Queue(); 103 final Queue<_EventRequest> _requestQueue = new Queue();
104 104
105 /// Create a `StreamQueue` of the events of [source]. 105 /// Create a `StreamQueue` of the events of [source].
106 StreamQueue(Stream source) 106 StreamQueue(Stream<T> source)
107 : _sourceStream = source is ForkableStream 107 : _sourceStream = source is ForkableStream
108 ? source 108 ? source
109 : new ForkableStream(source); 109 : new ForkableStream(source);
110 110
111 /// Asks if the stream has any more events. 111 /// Asks if the stream has any more events.
112 /// 112 ///
113 /// Returns a future that completes with `true` if the stream has any 113 /// Returns a future that completes with `true` if the stream has any
114 /// more events, whether data or error. 114 /// more events, whether data or error.
115 /// If the stream closes without producing any more events, the returned 115 /// If the stream closes without producing any more events, the returned
116 /// future completes with `false`. 116 /// future completes with `false`.
(...skipping 237 matching lines...) Expand 10 before | Expand all | Expand 10 after
354 } 354 }
355 355
356 if (!_isDone) { 356 if (!_isDone) {
357 _subscription.pause(); 357 _subscription.pause();
358 } 358 }
359 } 359 }
360 360
361 /// Extracts the subscription and makes this stream queue unusable. 361 /// Extracts the subscription and makes this stream queue unusable.
362 /// 362 ///
363 /// Can only be used by the very last request. 363 /// Can only be used by the very last request.
364 StreamSubscription _dispose() { 364 StreamSubscription<T> _dispose() {
365 assert(_isClosed); 365 assert(_isClosed);
366 var subscription = _subscription; 366 var subscription = _subscription;
367 _subscription = null; 367 _subscription = null;
368 _isDone = true; 368 _isDone = true;
369 return subscription; 369 return subscription;
370 } 370 }
371 } 371 }
372 372
373 /// Request object that receives events when they arrive, until fulfilled. 373 /// Request object that receives events when they arrive, until fulfilled.
374 /// 374 ///
(...skipping 44 matching lines...) Expand 10 before | Expand all | Expand 10 after
419 /// this is the last chance to use them. 419 /// this is the last chance to use them.
420 void close(Queue<Result> events); 420 void close(Queue<Result> events);
421 } 421 }
422 422
423 /// Request for a [StreamQueue.next] call. 423 /// Request for a [StreamQueue.next] call.
424 /// 424 ///
425 /// Completes the returned future when receiving the first event, 425 /// Completes the returned future when receiving the first event,
426 /// and is then complete. 426 /// and is then complete.
427 class _NextRequest<T> implements _EventRequest { 427 class _NextRequest<T> implements _EventRequest {
428 /// Completer for the future returned by [StreamQueue.next]. 428 /// Completer for the future returned by [StreamQueue.next].
429 final Completer _completer; 429 final _completer = new Completer<T>();
430 430
431 _NextRequest() : _completer = new Completer<T>(); 431 _NextRequest();
432 432
433 Future<T> get future => _completer.future; 433 Future<T> get future => _completer.future;
434 434
435 bool addEvents(Queue<Result> events) { 435 bool addEvents(Queue<Result> events) {
436 if (events.isEmpty) return false; 436 if (events.isEmpty) return false;
437 events.removeFirst().complete(_completer); 437 events.removeFirst().complete(_completer);
438 return true; 438 return true;
439 } 439 }
440 440
441 void close(Queue<Result> events) { 441 void close(Queue<Result> events) {
442 var errorFuture = 442 var errorFuture =
443 new Future.sync(() => throw new StateError("No elements")); 443 new Future.sync(() => throw new StateError("No elements"));
444 _completer.complete(errorFuture); 444 _completer.complete(errorFuture);
445 } 445 }
446 } 446 }
447 447
448 /// Request for a [StreamQueue.skip] call. 448 /// Request for a [StreamQueue.skip] call.
449 class _SkipRequest implements _EventRequest { 449 class _SkipRequest implements _EventRequest {
450 /// Completer for the future returned by the skip call. 450 /// Completer for the future returned by the skip call.
451 final Completer _completer = new Completer<int>(); 451 final _completer = new Completer<int>();
452 452
453 /// Number of remaining events to skip. 453 /// Number of remaining events to skip.
454 /// 454 ///
455 /// The request [isComplete] when the values reaches zero. 455 /// The request [isComplete] when the values reaches zero.
456 /// 456 ///
457 /// Decremented when an event is seen. 457 /// Decremented when an event is seen.
458 /// Set to zero when an error is seen since errors abort the skip request. 458 /// Set to zero when an error is seen since errors abort the skip request.
459 int _eventsToSkip; 459 int _eventsToSkip;
460 460
461 _SkipRequest(this._eventsToSkip); 461 _SkipRequest(this._eventsToSkip);
462 462
463 /// The future completed when the correct number of events have been skipped. 463 /// The future completed when the correct number of events have been skipped.
464 Future get future => _completer.future; 464 Future<int> get future => _completer.future;
465 465
466 bool addEvents(Queue<Result> events) { 466 bool addEvents(Queue<Result> events) {
467 while (_eventsToSkip > 0) { 467 while (_eventsToSkip > 0) {
468 if (events.isEmpty) return false; 468 if (events.isEmpty) return false;
469 _eventsToSkip--; 469 _eventsToSkip--;
470 var event = events.removeFirst(); 470 var event = events.removeFirst();
471 if (event.isError) { 471 if (event.isError) {
472 event.complete(_completer); 472 event.complete(_completer);
473 return true; 473 return true;
474 } 474 }
475 } 475 }
476 _completer.complete(0); 476 _completer.complete(0);
477 return true; 477 return true;
478 } 478 }
479 479
480 void close(Queue<Result> events) { 480 void close(Queue<Result> events) {
481 _completer.complete(_eventsToSkip); 481 _completer.complete(_eventsToSkip);
482 } 482 }
483 } 483 }
484 484
485 /// Request for a [StreamQueue.take] call. 485 /// Request for a [StreamQueue.take] call.
486 class _TakeRequest<T> implements _EventRequest { 486 class _TakeRequest<T> implements _EventRequest {
487 /// Completer for the future returned by the take call. 487 /// Completer for the future returned by the take call.
488 final Completer _completer; 488 final _completer = new Completer<List<T>>();
489 489
490 /// List collecting events until enough have been seen. 490 /// List collecting events until enough have been seen.
491 final List _list = <T>[]; 491 final List _list = <T>[];
492 492
493 /// Number of events to capture. 493 /// Number of events to capture.
494 /// 494 ///
495 /// The request [isComplete] when the length of [_list] reaches 495 /// The request [isComplete] when the length of [_list] reaches
496 /// this value. 496 /// this value.
497 final int _eventsToTake; 497 final int _eventsToTake;
498 498
499 _TakeRequest(this._eventsToTake) : _completer = new Completer<List<T>>(); 499 _TakeRequest(this._eventsToTake);
500 500
501 /// The future completed when the correct number of events have been captured. 501 /// The future completed when the correct number of events have been captured.
502 Future get future => _completer.future; 502 Future<List<T>> get future => _completer.future;
503 503
504 bool addEvents(Queue<Result> events) { 504 bool addEvents(Queue<Result> events) {
505 while (_list.length < _eventsToTake) { 505 while (_list.length < _eventsToTake) {
506 if (events.isEmpty) return false; 506 if (events.isEmpty) return false;
507 var result = events.removeFirst(); 507 var result = events.removeFirst();
508 if (result.isError) { 508 if (result.isError) {
509 result.complete(_completer); 509 result.complete(_completer);
510 return true; 510 return true;
511 } 511 }
512 _list.add(result.asValue.value); 512 _list.add(result.asValue.value);
(...skipping 46 matching lines...) Expand 10 before | Expand all | Expand 10 after
559 } 559 }
560 } 560 }
561 561
562 /// Request for a [StreamQueue.rest] call. 562 /// Request for a [StreamQueue.rest] call.
563 /// 563 ///
564 /// The request is always complete, it just waits in the request queue 564 /// The request is always complete, it just waits in the request queue
565 /// until all previous events are fulfilled, then it takes over the 565 /// until all previous events are fulfilled, then it takes over the
566 /// stream events subscription and creates a stream from it. 566 /// stream events subscription and creates a stream from it.
567 class _RestRequest<T> implements _EventRequest { 567 class _RestRequest<T> implements _EventRequest {
568 /// Completer for the stream returned by the `rest` call. 568 /// Completer for the stream returned by the `rest` call.
569 final StreamCompleter _completer = new StreamCompleter<T>(); 569 final _completer = new StreamCompleter<T>();
570 570
571 /// The [StreamQueue] object that has this request queued. 571 /// The [StreamQueue] object that has this request queued.
572 /// 572 ///
573 /// When the event is completed, it needs to cancel the active subscription 573 /// When the event is completed, it needs to cancel the active subscription
574 /// of the `StreamQueue` object, if any. 574 /// of the `StreamQueue` object, if any.
575 final StreamQueue _streamQueue; 575 final StreamQueue<T> _streamQueue;
576 576
577 _RestRequest(this._streamQueue); 577 _RestRequest(this._streamQueue);
578 578
579 /// The stream which will contain the remaining events of [_streamQueue]. 579 /// The stream which will contain the remaining events of [_streamQueue].
580 Stream<T> get stream => _completer.stream; 580 Stream<T> get stream => _completer.stream;
581 581
582 bool addEvents(Queue<Result> events) { 582 bool addEvents(Queue<Result> events) {
583 _completeStream(events); 583 _completeStream(events);
584 return true; 584 return true;
585 } 585 }
(...skipping 16 matching lines...) Expand all
602 for (var event in events) { 602 for (var event in events) {
603 event.addTo(controller); 603 event.addTo(controller);
604 } 604 }
605 controller.addStream(_getRestStream(), cancelOnError: false) 605 controller.addStream(_getRestStream(), cancelOnError: false)
606 .whenComplete(controller.close); 606 .whenComplete(controller.close);
607 _completer.setSourceStream(controller.stream); 607 _completer.setSourceStream(controller.stream);
608 } 608 }
609 } 609 }
610 610
611 /// Create a stream from the rest of [_streamQueue]'s subscription. 611 /// Create a stream from the rest of [_streamQueue]'s subscription.
612 Stream _getRestStream() { 612 Stream<T> _getRestStream() {
613 if (_streamQueue._isDone) { 613 if (_streamQueue._isDone) {
614 var controller = new StreamController<T>()..close(); 614 var controller = new StreamController<T>()..close();
615 return controller.stream; 615 return controller.stream;
616 // TODO(lrn). Use the following when 1.11 is released. 616 // TODO(lrn). Use the following when 1.11 is released.
617 // return new Stream<T>.empty(); 617 // return new Stream<T>.empty();
618 } 618 }
619 if (_streamQueue._subscription == null) { 619 if (_streamQueue._subscription == null) {
620 return _streamQueue._sourceStream; 620 return _streamQueue._sourceStream;
621 } 621 }
622 var subscription = _streamQueue._dispose(); 622 var subscription = _streamQueue._dispose();
623 subscription.resume(); 623 subscription.resume();
624 return new SubscriptionStream<T>(subscription); 624 return new SubscriptionStream<T>(subscription);
625 } 625 }
626 } 626 }
627 627
628 /// Request for a [StreamQueue.hasNext] call. 628 /// Request for a [StreamQueue.hasNext] call.
629 /// 629 ///
630 /// Completes the [future] with `true` if it sees any event, 630 /// Completes the [future] with `true` if it sees any event,
631 /// but doesn't consume the event. 631 /// but doesn't consume the event.
632 /// If the request is closed without seeing an event, then 632 /// If the request is closed without seeing an event, then
633 /// the [future] is completed with `false`. 633 /// the [future] is completed with `false`.
634 class _HasNextRequest<T> implements _EventRequest { 634 class _HasNextRequest<T> implements _EventRequest {
635 final Completer _completer = new Completer<bool>(); 635 final _completer = new Completer<bool>();
636 636
637 Future<bool> get future => _completer.future; 637 Future<bool> get future => _completer.future;
638 638
639 bool addEvents(Queue<Result> events) { 639 bool addEvents(Queue<Result> events) {
640 if (events.isNotEmpty) { 640 if (events.isNotEmpty) {
641 _completer.complete(true); 641 _completer.complete(true);
642 return true; 642 return true;
643 } 643 }
644 return false; 644 return false;
645 } 645 }
646 646
647 void close(_) { 647 void close(_) {
648 _completer.complete(false); 648 _completer.complete(false);
649 } 649 }
650 } 650 }
651 651
652 /// Request for a [StreamQueue.fork] call. 652 /// Request for a [StreamQueue.fork] call.
653 class _ForkRequest<T> implements _EventRequest { 653 class _ForkRequest<T> implements _EventRequest {
654 /// Completer for the stream used by the queue by the `fork` call. 654 /// Completer for the stream used by the queue by the `fork` call.
655 StreamCompleter _completer; 655 StreamCompleter<T> _completer;
656 656
657 StreamQueue<T> queue; 657 StreamQueue<T> queue;
658 658
659 /// The [StreamQueue] object that has this request queued. 659 /// The [StreamQueue] object that has this request queued.
660 final StreamQueue _streamQueue; 660 final StreamQueue<T> _streamQueue;
661 661
662 _ForkRequest(this._streamQueue) { 662 _ForkRequest(this._streamQueue) {
663 _completer = new StreamCompleter<T>(); 663 _completer = new StreamCompleter();
664 queue = new StreamQueue<T>(_completer.stream); 664 queue = new StreamQueue(_completer.stream);
665 } 665 }
666 666
667 bool addEvents(Queue<Result> events) { 667 bool addEvents(Queue<Result> events) {
668 _completeStream(events); 668 _completeStream(events);
669 return true; 669 return true;
670 } 670 }
671 671
672 void close(Queue<Result> events) { 672 void close(Queue<Result> events) {
673 _completeStream(events); 673 _completeStream(events);
674 } 674 }
(...skipping 13 matching lines...) Expand all
688 event.addTo(controller); 688 event.addTo(controller);
689 } 689 }
690 690
691 var fork = _streamQueue._sourceStream.fork(); 691 var fork = _streamQueue._sourceStream.fork();
692 controller.addStream(fork, cancelOnError: false) 692 controller.addStream(fork, cancelOnError: false)
693 .whenComplete(controller.close); 693 .whenComplete(controller.close);
694 _completer.setSourceStream(controller.stream); 694 _completer.setSourceStream(controller.stream);
695 } 695 }
696 } 696 }
697 } 697 }
OLDNEW
« no previous file with comments | « lib/src/util/io.dart ('k') | lib/src/utils.dart » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698