Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(73)

Side by Side Diff: lib/src/stream_queue.dart

Issue 1223423002: Use a named param for StreamQueue.cancelImmediately. (Closed) Base URL: git@github.com:dart-lang/async.git@master
Patch Set: Code review changes Created 5 years, 5 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
« no previous file with comments | « no previous file | test/stream_queue_test.dart » ('j') | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
1 // Copyright (c) 2015, the Dart project authors. Please see the AUTHORS file 1 // Copyright (c) 2015, the Dart project authors. Please see the AUTHORS file
2 // for details. All rights reserved. Use of this source code is governed by a 2 // for details. All rights reserved. Use of this source code is governed by a
3 // BSD-style license that can be found in the LICENSE file. 3 // BSD-style license that can be found in the LICENSE file.
4 4
5 library async.stream_events; 5 library async.stream_events;
6 6
7 import 'dart:async'; 7 import 'dart:async';
8 import 'dart:collection'; 8 import 'dart:collection';
9 9
10 import "subscription_stream.dart"; 10 import "subscription_stream.dart";
(...skipping 73 matching lines...) Expand 10 before | Expand all | Expand 10 after
84 /// 84 ///
85 /// Set to subscription when listening, and set to `null` when the 85 /// Set to subscription when listening, and set to `null` when the
86 /// subscription is done (and [_isDone] is set to true). 86 /// subscription is done (and [_isDone] is set to true).
87 StreamSubscription _subscription; 87 StreamSubscription _subscription;
88 88
89 /// Whether we have listened on [_sourceStream] and the subscription is done. 89 /// Whether we have listened on [_sourceStream] and the subscription is done.
90 bool _isDone = false; 90 bool _isDone = false;
91 91
92 /// Whether a closing operation has been performed on the stream queue. 92 /// Whether a closing operation has been performed on the stream queue.
93 /// 93 ///
94 /// Closing operations are [cancel], [cancelImmediately], and [rest]. 94 /// Closing operations are [cancel] and [rest].
95 bool _isClosed = false; 95 bool _isClosed = false;
96 96
97 /// Queue of events not used by a request yet. 97 /// Queue of events not used by a request yet.
98 final Queue<Result> _eventQueue = new Queue(); 98 final Queue<Result> _eventQueue = new Queue();
99 99
100 /// Queue of pending requests. 100 /// Queue of pending requests.
101 /// 101 ///
102 /// Access through methods below to ensure consistency. 102 /// Access through methods below to ensure consistency.
103 final Queue<_EventRequest> _requestQueue = new Queue(); 103 final Queue<_EventRequest> _requestQueue = new Queue();
104 104
(...skipping 106 matching lines...) Expand 10 before | Expand all | Expand 10 after
211 if (!_isClosed) { 211 if (!_isClosed) {
212 var request = new _TakeRequest<T>(count); 212 var request = new _TakeRequest<T>(count);
213 _addRequest(request); 213 _addRequest(request);
214 return request.future; 214 return request.future;
215 } 215 }
216 throw _failClosed(); 216 throw _failClosed();
217 } 217 }
218 218
219 /// Cancels the underlying stream subscription. 219 /// Cancels the underlying stream subscription.
220 /// 220 ///
221 /// The cancel operation waits until all previously requested 221 /// If [immediate] is `false` (the default), the cancel operation waits until
222 /// events have been processed, then it cancels the subscription 222 /// all previously requested events have been processed, then it cancels the
223 /// providing the events. 223 /// subscription providing the events.
224 ///
225 /// If [immediate] is `true`, the subscription is instead canceled
226 /// immediately. Any pending events are completed as though the underlying
227 /// stream had closed.
224 /// 228 ///
225 /// The returned future completes with the result of calling 229 /// The returned future completes with the result of calling
226 /// `cancel`. 230 /// `cancel`.
227 /// 231 ///
228 /// After calling `cancel`, no further events can be requested. 232 /// After calling `cancel`, no further events can be requested.
229 /// None of [next], [rest], [skip], [take] or [cancel] may be 233 /// None of [next], [rest], [skip], [take] or [cancel] may be
230 /// called again. 234 /// called again.
231 Future cancel() { 235 Future cancel({bool immediate: false}) {
232 if (!_isClosed) { 236 if (_isClosed) throw _failClosed();
233 _isClosed = true; 237 _isClosed = true;
238
239 if (!immediate) {
234 var request = new _CancelRequest(this); 240 var request = new _CancelRequest(this);
235 _addRequest(request); 241 _addRequest(request);
236 return request.future; 242 return request.future;
237 } 243 }
238 throw _failClosed();
239 }
240
241 /// Cancels the underlying stream subscription immediately.
242 ///
243 /// Any pending events will complete as though the stream had closed when
244 /// [cancel] was called.
245 ///
246 /// The returned future completes with the result of calling
247 /// `StreamSubscription.cancel`.
248 ///
249 /// After calling `cancelImmediately`, no further events can be requested.
250 /// None of [next], [rest], [skip], [take] or [cancel] may be called again.
251 Future cancelImmediately() {
252 if (_isClosed) throw _failClosed();
253 _isClosed = true;
254 244
255 if (_isDone) return new Future.value(); 245 if (_isDone) return new Future.value();
256 if (_subscription == null) _subscription = _sourceStream.listen(null); 246 if (_subscription == null) _subscription = _sourceStream.listen(null);
257 var future = _subscription.cancel(); 247 var future = _subscription.cancel();
258 _onDone(); 248 _onDone();
259 return future; 249 return future;
260 } 250 }
261 251
262 /// Returns an error for when a request is made after cancel. 252 /// Returns an error for when a request is made after cancel.
263 /// 253 ///
264 /// Returns a [StateError] with a message saying that either 254 /// Returns a [StateError] with a message saying that either
265 /// [cancel], [cancelImmediately], or [rest] have already been called. 255 /// [cancel] or [rest] have already been called.
266 Error _failClosed() { 256 Error _failClosed() {
267 return new StateError("Already cancelled"); 257 return new StateError("Already cancelled");
268 } 258 }
269 259
270 // Callbacks receiving the events of the source stream. 260 // Callbacks receiving the events of the source stream.
271 261
272 void _onData(T data) { 262 void _onData(T data) {
273 _eventQueue.add(new Result.value(data)); 263 _eventQueue.add(new Result.value(data));
274 _checkQueues(); 264 _checkQueues();
275 } 265 }
(...skipping 354 matching lines...) Expand 10 before | Expand all | Expand 10 after
630 _completer.complete(true); 620 _completer.complete(true);
631 return true; 621 return true;
632 } 622 }
633 return false; 623 return false;
634 } 624 }
635 625
636 void close(_) { 626 void close(_) {
637 _completer.complete(false); 627 _completer.complete(false);
638 } 628 }
639 } 629 }
OLDNEW
« no previous file with comments | « no previous file | test/stream_queue_test.dart » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698