Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(286)

Side by Side Diff: lib/src/stream_queue.dart

Issue 1239543004: Fix a bug in StreamQueue.cancel(). (Closed) Base URL: git@github.com:dart-lang/async.git@master
Patch Set: Created 5 years, 5 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
« no previous file with comments | « no previous file | test/stream_queue_test.dart » ('j') | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
1 // Copyright (c) 2015, the Dart project authors. Please see the AUTHORS file 1 // Copyright (c) 2015, the Dart project authors. Please see the AUTHORS file
2 // for details. All rights reserved. Use of this source code is governed by a 2 // for details. All rights reserved. Use of this source code is governed by a
3 // BSD-style license that can be found in the LICENSE file. 3 // BSD-style license that can be found in the LICENSE file.
4 4
5 library async.stream_events; 5 library async.stream_events;
6 6
7 import 'dart:async'; 7 import 'dart:async';
8 import 'dart:collection'; 8 import 'dart:collection';
9 9
10 import "subscription_stream.dart"; 10 import "subscription_stream.dart";
(...skipping 218 matching lines...) Expand 10 before | Expand all | Expand 10 after
229 /// The returned future completes with the result of calling 229 /// The returned future completes with the result of calling
230 /// `cancel`. 230 /// `cancel`.
231 /// 231 ///
232 /// After calling `cancel`, no further events can be requested. 232 /// After calling `cancel`, no further events can be requested.
233 /// None of [next], [rest], [skip], [take] or [cancel] may be 233 /// None of [next], [rest], [skip], [take] or [cancel] may be
234 /// called again. 234 /// called again.
235 Future cancel({bool immediate: false}) { 235 Future cancel({bool immediate: false}) {
236 if (_isClosed) throw _failClosed(); 236 if (_isClosed) throw _failClosed();
237 _isClosed = true; 237 _isClosed = true;
238 238
239 if (_isDone) return new Future.value();
240 if (_subscription == null) _subscription = _sourceStream.listen(null);
241
239 if (!immediate) { 242 if (!immediate) {
240 var request = new _CancelRequest(this); 243 var request = new _CancelRequest(this);
241 _addRequest(request); 244 _addRequest(request);
242 return request.future; 245 return request.future;
Lasse Reichstein Nielsen 2015/07/14 07:28:10 That doesn't look right either - you subscribe to
243 } 246 }
244 247
245 if (_isDone) return new Future.value();
246 if (_subscription == null) _subscription = _sourceStream.listen(null);
247 var future = _subscription.cancel(); 248 var future = _subscription.cancel();
248 _onDone(); 249 _onDone();
249 return future; 250 return future;
250 } 251 }
251 252
252 /// Returns an error for when a request is made after cancel. 253 /// Returns an error for when a request is made after cancel.
253 /// 254 ///
254 /// Returns a [StateError] with a message saying that either 255 /// Returns a [StateError] with a message saying that either
255 /// [cancel] or [rest] have already been called. 256 /// [cancel] or [rest] have already been called.
256 Error _failClosed() { 257 Error _failClosed() {
(...skipping 267 matching lines...) Expand 10 before | Expand all | Expand 10 after
524 _shutdown(); 525 _shutdown();
525 return true; 526 return true;
526 } 527 }
527 528
528 void close(_) { 529 void close(_) {
529 _shutdown(); 530 _shutdown();
530 } 531 }
531 532
532 void _shutdown() { 533 void _shutdown() {
533 if (_streamQueue._subscription == null) { 534 if (_streamQueue._subscription == null) {
534 _completer.complete(); 535 _completer.complete();
Lasse Reichstein Nielsen 2015/07/14 07:28:10 This skips creating the subscription if it's null.
nweiz 2015/07/14 20:14:08 Done.
535 } else { 536 } else {
536 _completer.complete(_streamQueue._dispose().cancel()); 537 _completer.complete(_streamQueue._dispose().cancel());
537 } 538 }
538 } 539 }
539 } 540 }
540 541
541 /// Request for a [StreamQueue.rest] call. 542 /// Request for a [StreamQueue.rest] call.
542 /// 543 ///
543 /// The request is always complete, it just waits in the request queue 544 /// The request is always complete, it just waits in the request queue
544 /// until all previous events are fulfilled, then it takes over the 545 /// until all previous events are fulfilled, then it takes over the
(...skipping 75 matching lines...) Expand 10 before | Expand all | Expand 10 after
620 _completer.complete(true); 621 _completer.complete(true);
621 return true; 622 return true;
622 } 623 }
623 return false; 624 return false;
624 } 625 }
625 626
626 void close(_) { 627 void close(_) {
627 _completer.complete(false); 628 _completer.complete(false);
628 } 629 }
629 } 630 }
OLDNEW
« no previous file with comments | « no previous file | test/stream_queue_test.dart » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698