OLD | NEW |
---|---|
1 // Copyright (c) 2015, the Dart project authors. Please see the AUTHORS file | 1 // Copyright (c) 2015, the Dart project authors. Please see the AUTHORS file |
2 // for details. All rights reserved. Use of this source code is governed by a | 2 // for details. All rights reserved. Use of this source code is governed by a |
3 // BSD-style license that can be found in the LICENSE file. | 3 // BSD-style license that can be found in the LICENSE file. |
4 | 4 |
5 library async.stream_events; | 5 library async.stream_events; |
6 | 6 |
7 import 'dart:async'; | 7 import 'dart:async'; |
8 import 'dart:collection'; | 8 import 'dart:collection'; |
9 | 9 |
10 import "subscription_stream.dart"; | 10 import "subscription_stream.dart"; |
(...skipping 200 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
211 if (!_isClosed) { | 211 if (!_isClosed) { |
212 var request = new _TakeRequest<T>(count); | 212 var request = new _TakeRequest<T>(count); |
213 _addRequest(request); | 213 _addRequest(request); |
214 return request.future; | 214 return request.future; |
215 } | 215 } |
216 throw _failClosed(); | 216 throw _failClosed(); |
217 } | 217 } |
218 | 218 |
219 /// Cancels the underlying stream subscription. | 219 /// Cancels the underlying stream subscription. |
220 /// | 220 /// |
221 /// The cancel operation waits until all previously requested | 221 /// If [immediate] is `false` (the default), the cancel operation waits until |
222 /// events have been processed, then it cancels the subscription | 222 /// all previously requested events have been processed, then it cancels the |
223 /// providing the events. | 223 /// subscription providing the events. |
224 /// | |
225 /// If [immediate] is `true`, the subscription is instead canceled | |
226 /// immediately. Any pending events will complete as though the stream had | |
floitsch
2015/07/09 09:47:52
Could you please rewrite the "Any pending events .
nweiz
2015/07/09 19:43:36
Can you say more about what you didn't understand?
floitsch
2015/07/13 09:39:38
Any pending events complete with a 'closed'-event,
nweiz
2015/07/13 20:07:21
Right, but what about that sentence is confusing?
| |
227 /// closed when [cancel] was called. | |
224 /// | 228 /// |
225 /// The returned future completes with the result of calling | 229 /// The returned future completes with the result of calling |
226 /// `cancel`. | 230 /// `cancel`. |
227 /// | 231 /// |
228 /// After calling `cancel`, no further events can be requested. | 232 /// After calling `cancel`, no further events can be requested. |
229 /// None of [next], [rest], [skip], [take] or [cancel] may be | 233 /// None of [next], [rest], [skip], [take] or [cancel] may be |
230 /// called again. | 234 /// called again. |
231 Future cancel() { | 235 Future cancel({bool immediate: false}) { |
232 if (!_isClosed) { | 236 if (_isClosed) throw _failClosed(); |
233 _isClosed = true; | 237 _isClosed = true; |
238 | |
239 if (!immediate) { | |
234 var request = new _CancelRequest(this); | 240 var request = new _CancelRequest(this); |
235 _addRequest(request); | 241 _addRequest(request); |
236 return request.future; | 242 return request.future; |
237 } | 243 } |
238 throw _failClosed(); | 244 |
245 if (_isDone) return new Future.value(); | |
246 if (_subscription == null) _subscription = _sourceStream.listen(null); | |
247 var future = _subscription.cancel(); | |
248 _onDone(); | |
249 return future; | |
239 } | 250 } |
Lasse Reichstein Nielsen
2015/07/08 07:49:22
This is really two different functions dispatched
nweiz
2015/07/09 01:03:19
I considered that, but ended up going this directi
floitsch
2015/07/09 09:47:52
One function looks fine to me.
| |
240 | 251 |
241 /// Returns an error for when a request is made after cancel. | 252 /// Returns an error for when a request is made after cancel. |
242 /// | 253 /// |
243 /// Returns a [StateError] with a message saying that either | 254 /// Returns a [StateError] with a message saying that either |
244 /// [cancel] or [rest] have already been called. | 255 /// [cancel] or [rest] have already been called. |
245 Error _failClosed() { | 256 Error _failClosed() { |
246 return new StateError("Already cancelled"); | 257 return new StateError("Already cancelled"); |
247 } | 258 } |
248 | 259 |
249 // Callbacks receiving the events of the source stream. | 260 // Callbacks receiving the events of the source stream. |
(...skipping 23 matching lines...) Expand all Loading... | |
273 if (!request.addEvents(_eventQueue)) { | 284 if (!request.addEvents(_eventQueue)) { |
274 request.close(_eventQueue); | 285 request.close(_eventQueue); |
275 } | 286 } |
276 return; | 287 return; |
277 } | 288 } |
278 if (_requestQueue.isEmpty) { | 289 if (_requestQueue.isEmpty) { |
279 if (request.addEvents(_eventQueue)) return; | 290 if (request.addEvents(_eventQueue)) return; |
280 _ensureListening(); | 291 _ensureListening(); |
281 } | 292 } |
282 _requestQueue.add(request); | 293 _requestQueue.add(request); |
283 | |
284 } | 294 } |
285 | 295 |
286 /// Ensures that we are listening on events from [_sourceStream]. | 296 /// Ensures that we are listening on events from [_sourceStream]. |
287 /// | 297 /// |
288 /// Resumes subscription on [_sourceStream], or creates it if necessary. | 298 /// Resumes subscription on [_sourceStream], or creates it if necessary. |
289 void _ensureListening() { | 299 void _ensureListening() { |
290 assert(!_isDone); | 300 assert(!_isDone); |
291 if (_subscription == null) { | 301 if (_subscription == null) { |
292 _subscription = | 302 _subscription = |
293 _sourceStream.listen(_onData, onError: _onError, onDone: _onDone); | 303 _sourceStream.listen(_onData, onError: _onError, onDone: _onDone); |
(...skipping 316 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
610 _completer.complete(true); | 620 _completer.complete(true); |
611 return true; | 621 return true; |
612 } | 622 } |
613 return false; | 623 return false; |
614 } | 624 } |
615 | 625 |
616 void close(_) { | 626 void close(_) { |
617 _completer.complete(false); | 627 _completer.complete(false); |
618 } | 628 } |
619 } | 629 } |
OLD | NEW |