OLD | NEW |
1 // Copyright (c) 2015, the Dart project authors. Please see the AUTHORS file | 1 // Copyright (c) 2015, the Dart project authors. Please see the AUTHORS file |
2 // for details. All rights reserved. Use of this source code is governed by a | 2 // for details. All rights reserved. Use of this source code is governed by a |
3 // BSD-style license that can be found in the LICENSE file. | 3 // BSD-style license that can be found in the LICENSE file. |
4 | 4 |
5 library async.stream_events; | 5 library async.stream_events; |
6 | 6 |
7 import 'dart:async'; | 7 import 'dart:async'; |
8 import 'dart:collection'; | 8 import 'dart:collection'; |
9 | 9 |
10 import "subscription_stream.dart"; | 10 import "subscription_stream.dart"; |
(...skipping 512 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
523 bool addEvents(Queue<Result> events) { | 523 bool addEvents(Queue<Result> events) { |
524 _shutdown(); | 524 _shutdown(); |
525 return true; | 525 return true; |
526 } | 526 } |
527 | 527 |
528 void close(_) { | 528 void close(_) { |
529 _shutdown(); | 529 _shutdown(); |
530 } | 530 } |
531 | 531 |
532 void _shutdown() { | 532 void _shutdown() { |
533 if (_streamQueue._subscription == null) { | 533 if (_streamQueue._isDone) { |
534 _completer.complete(); | 534 _completer.complete(); |
535 } else { | 535 } else { |
| 536 _streamQueue._ensureListening(); |
536 _completer.complete(_streamQueue._dispose().cancel()); | 537 _completer.complete(_streamQueue._dispose().cancel()); |
537 } | 538 } |
538 } | 539 } |
539 } | 540 } |
540 | 541 |
541 /// Request for a [StreamQueue.rest] call. | 542 /// Request for a [StreamQueue.rest] call. |
542 /// | 543 /// |
543 /// The request is always complete, it just waits in the request queue | 544 /// The request is always complete, it just waits in the request queue |
544 /// until all previous events are fulfilled, then it takes over the | 545 /// until all previous events are fulfilled, then it takes over the |
545 /// stream events subscription and creates a stream from it. | 546 /// stream events subscription and creates a stream from it. |
(...skipping 74 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
620 _completer.complete(true); | 621 _completer.complete(true); |
621 return true; | 622 return true; |
622 } | 623 } |
623 return false; | 624 return false; |
624 } | 625 } |
625 | 626 |
626 void close(_) { | 627 void close(_) { |
627 _completer.complete(false); | 628 _completer.complete(false); |
628 } | 629 } |
629 } | 630 } |
OLD | NEW |