Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(477)

Side by Side Diff: lib/src/stream_queue.dart

Issue 1221713005: Add an [immediate] argument to StreamQueue.cancel. (Closed) Base URL: git@github.com:dart-lang/async.git@master
Patch Set: Created 5 years, 5 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
« no previous file with comments | « no previous file | test/stream_queue_test.dart » ('j') | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
1 // Copyright (c) 2015, the Dart project authors. Please see the AUTHORS file 1 // Copyright (c) 2015, the Dart project authors. Please see the AUTHORS file
2 // for details. All rights reserved. Use of this source code is governed by a 2 // for details. All rights reserved. Use of this source code is governed by a
3 // BSD-style license that can be found in the LICENSE file. 3 // BSD-style license that can be found in the LICENSE file.
4 4
5 library async.stream_events; 5 library async.stream_events;
6 6
7 import 'dart:async'; 7 import 'dart:async';
8 import 'dart:collection'; 8 import 'dart:collection';
9 9
10 import "subscription_stream.dart"; 10 import "subscription_stream.dart";
(...skipping 200 matching lines...) Expand 10 before | Expand all | Expand 10 after
211 if (!_isClosed) { 211 if (!_isClosed) {
212 var request = new _TakeRequest<T>(count); 212 var request = new _TakeRequest<T>(count);
213 _addRequest(request); 213 _addRequest(request);
214 return request.future; 214 return request.future;
215 } 215 }
216 throw _failClosed(); 216 throw _failClosed();
217 } 217 }
218 218
219 /// Cancels the underlying stream subscription. 219 /// Cancels the underlying stream subscription.
220 /// 220 ///
221 /// The cancel operation waits until all previously requested 221 /// If [immediate] is `false` (the default), the cancel operation waits until
222 /// events have been processed, then it cancels the subscription 222 /// all previously requested events have been processed, then it cancels the
223 /// providing the events. 223 /// subscription providing the events.
224 ///
225 /// If [immediate] is `true`, the subscription is instead canceled
226 /// immediately. Any pending events will complete as though the stream had
floitsch 2015/07/09 09:47:52 Could you please rewrite the "Any pending events .
nweiz 2015/07/09 19:43:36 Can you say more about what you didn't understand?
floitsch 2015/07/13 09:39:38 Any pending events complete with a 'closed'-event,
nweiz 2015/07/13 20:07:21 Right, but what about that sentence is confusing?
227 /// closed when [cancel] was called.
224 /// 228 ///
225 /// The returned future completes with the result of calling 229 /// The returned future completes with the result of calling
226 /// `cancel`. 230 /// `cancel`.
227 /// 231 ///
228 /// After calling `cancel`, no further events can be requested. 232 /// After calling `cancel`, no further events can be requested.
229 /// None of [next], [rest], [skip], [take] or [cancel] may be 233 /// None of [next], [rest], [skip], [take] or [cancel] may be
230 /// called again. 234 /// called again.
231 Future cancel() { 235 Future cancel({bool immediate: false}) {
232 if (!_isClosed) { 236 if (_isClosed) throw _failClosed();
233 _isClosed = true; 237 _isClosed = true;
238
239 if (!immediate) {
234 var request = new _CancelRequest(this); 240 var request = new _CancelRequest(this);
235 _addRequest(request); 241 _addRequest(request);
236 return request.future; 242 return request.future;
237 } 243 }
238 throw _failClosed(); 244
245 if (_isDone) return new Future.value();
246 if (_subscription == null) _subscription = _sourceStream.listen(null);
247 var future = _subscription.cancel();
248 _onDone();
249 return future;
239 } 250 }
Lasse Reichstein Nielsen 2015/07/08 07:49:22 This is really two different functions dispatched
nweiz 2015/07/09 01:03:19 I considered that, but ended up going this directi
floitsch 2015/07/09 09:47:52 One function looks fine to me.
240 251
241 /// Returns an error for when a request is made after cancel. 252 /// Returns an error for when a request is made after cancel.
242 /// 253 ///
243 /// Returns a [StateError] with a message saying that either 254 /// Returns a [StateError] with a message saying that either
244 /// [cancel] or [rest] have already been called. 255 /// [cancel] or [rest] have already been called.
245 Error _failClosed() { 256 Error _failClosed() {
246 return new StateError("Already cancelled"); 257 return new StateError("Already cancelled");
247 } 258 }
248 259
249 // Callbacks receiving the events of the source stream. 260 // Callbacks receiving the events of the source stream.
(...skipping 23 matching lines...) Expand all
273 if (!request.addEvents(_eventQueue)) { 284 if (!request.addEvents(_eventQueue)) {
274 request.close(_eventQueue); 285 request.close(_eventQueue);
275 } 286 }
276 return; 287 return;
277 } 288 }
278 if (_requestQueue.isEmpty) { 289 if (_requestQueue.isEmpty) {
279 if (request.addEvents(_eventQueue)) return; 290 if (request.addEvents(_eventQueue)) return;
280 _ensureListening(); 291 _ensureListening();
281 } 292 }
282 _requestQueue.add(request); 293 _requestQueue.add(request);
283
284 } 294 }
285 295
286 /// Ensures that we are listening on events from [_sourceStream]. 296 /// Ensures that we are listening on events from [_sourceStream].
287 /// 297 ///
288 /// Resumes subscription on [_sourceStream], or creates it if necessary. 298 /// Resumes subscription on [_sourceStream], or creates it if necessary.
289 void _ensureListening() { 299 void _ensureListening() {
290 assert(!_isDone); 300 assert(!_isDone);
291 if (_subscription == null) { 301 if (_subscription == null) {
292 _subscription = 302 _subscription =
293 _sourceStream.listen(_onData, onError: _onError, onDone: _onDone); 303 _sourceStream.listen(_onData, onError: _onError, onDone: _onDone);
(...skipping 316 matching lines...) Expand 10 before | Expand all | Expand 10 after
610 _completer.complete(true); 620 _completer.complete(true);
611 return true; 621 return true;
612 } 622 }
613 return false; 623 return false;
614 } 624 }
615 625
616 void close(_) { 626 void close(_) {
617 _completer.complete(false); 627 _completer.complete(false);
618 } 628 }
619 } 629 }
OLDNEW
« no previous file with comments | « no previous file | test/stream_queue_test.dart » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698