Chromium Code Reviews| OLD | NEW |
|---|---|
| 1 // Copyright (c) 2015, the Dart project authors. Please see the AUTHORS file | 1 // Copyright (c) 2015, the Dart project authors. Please see the AUTHORS file |
| 2 // for details. All rights reserved. Use of this source code is governed by a | 2 // for details. All rights reserved. Use of this source code is governed by a |
| 3 // BSD-style license that can be found in the LICENSE file. | 3 // BSD-style license that can be found in the LICENSE file. |
| 4 | 4 |
| 5 library async.stream_events; | 5 library async.stream_events; |
| 6 | 6 |
| 7 import 'dart:async'; | 7 import 'dart:async'; |
| 8 import 'dart:collection'; | 8 import 'dart:collection'; |
| 9 | 9 |
| 10 import "subscription_stream.dart"; | 10 import "subscription_stream.dart"; |
| (...skipping 200 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
| 211 if (!_isClosed) { | 211 if (!_isClosed) { |
| 212 var request = new _TakeRequest<T>(count); | 212 var request = new _TakeRequest<T>(count); |
| 213 _addRequest(request); | 213 _addRequest(request); |
| 214 return request.future; | 214 return request.future; |
| 215 } | 215 } |
| 216 throw _failClosed(); | 216 throw _failClosed(); |
| 217 } | 217 } |
| 218 | 218 |
| 219 /// Cancels the underlying stream subscription. | 219 /// Cancels the underlying stream subscription. |
| 220 /// | 220 /// |
| 221 /// The cancel operation waits until all previously requested | 221 /// If [immediate] is `false` (the default), the cancel operation waits until |
| 222 /// events have been processed, then it cancels the subscription | 222 /// all previously requested events have been processed, then it cancels the |
| 223 /// providing the events. | 223 /// subscription providing the events. |
| 224 /// | |
| 225 /// If [immediate] is `true`, the subscription is instead canceled | |
| 226 /// immediately. Any pending events will complete as though the stream had | |
|
floitsch
2015/07/09 09:47:52
Could you please rewrite the "Any pending events .
nweiz
2015/07/09 19:43:36
Can you say more about what you didn't understand?
floitsch
2015/07/13 09:39:38
Any pending events complete with a 'closed'-event,
nweiz
2015/07/13 20:07:21
Right, but what about that sentence is confusing?
| |
| 227 /// closed when [cancel] was called. | |
| 224 /// | 228 /// |
| 225 /// The returned future completes with the result of calling | 229 /// The returned future completes with the result of calling |
| 226 /// `cancel`. | 230 /// `cancel`. |
| 227 /// | 231 /// |
| 228 /// After calling `cancel`, no further events can be requested. | 232 /// After calling `cancel`, no further events can be requested. |
| 229 /// None of [next], [rest], [skip], [take] or [cancel] may be | 233 /// None of [next], [rest], [skip], [take] or [cancel] may be |
| 230 /// called again. | 234 /// called again. |
| 231 Future cancel() { | 235 Future cancel({bool immediate: false}) { |
| 232 if (!_isClosed) { | 236 if (_isClosed) throw _failClosed(); |
| 233 _isClosed = true; | 237 _isClosed = true; |
| 238 | |
| 239 if (!immediate) { | |
| 234 var request = new _CancelRequest(this); | 240 var request = new _CancelRequest(this); |
| 235 _addRequest(request); | 241 _addRequest(request); |
| 236 return request.future; | 242 return request.future; |
| 237 } | 243 } |
| 238 throw _failClosed(); | 244 |
| 245 if (_isDone) return new Future.value(); | |
| 246 if (_subscription == null) _subscription = _sourceStream.listen(null); | |
| 247 var future = _subscription.cancel(); | |
| 248 _onDone(); | |
| 249 return future; | |
| 239 } | 250 } |
|
Lasse Reichstein Nielsen
2015/07/08 07:49:22
This is really two different functions dispatched
nweiz
2015/07/09 01:03:19
I considered that, but ended up going this directi
floitsch
2015/07/09 09:47:52
One function looks fine to me.
| |
| 240 | 251 |
| 241 /// Returns an error for when a request is made after cancel. | 252 /// Returns an error for when a request is made after cancel. |
| 242 /// | 253 /// |
| 243 /// Returns a [StateError] with a message saying that either | 254 /// Returns a [StateError] with a message saying that either |
| 244 /// [cancel] or [rest] have already been called. | 255 /// [cancel] or [rest] have already been called. |
| 245 Error _failClosed() { | 256 Error _failClosed() { |
| 246 return new StateError("Already cancelled"); | 257 return new StateError("Already cancelled"); |
| 247 } | 258 } |
| 248 | 259 |
| 249 // Callbacks receiving the events of the source stream. | 260 // Callbacks receiving the events of the source stream. |
| (...skipping 23 matching lines...) Expand all Loading... | |
| 273 if (!request.addEvents(_eventQueue)) { | 284 if (!request.addEvents(_eventQueue)) { |
| 274 request.close(_eventQueue); | 285 request.close(_eventQueue); |
| 275 } | 286 } |
| 276 return; | 287 return; |
| 277 } | 288 } |
| 278 if (_requestQueue.isEmpty) { | 289 if (_requestQueue.isEmpty) { |
| 279 if (request.addEvents(_eventQueue)) return; | 290 if (request.addEvents(_eventQueue)) return; |
| 280 _ensureListening(); | 291 _ensureListening(); |
| 281 } | 292 } |
| 282 _requestQueue.add(request); | 293 _requestQueue.add(request); |
| 283 | |
| 284 } | 294 } |
| 285 | 295 |
| 286 /// Ensures that we are listening on events from [_sourceStream]. | 296 /// Ensures that we are listening on events from [_sourceStream]. |
| 287 /// | 297 /// |
| 288 /// Resumes subscription on [_sourceStream], or creates it if necessary. | 298 /// Resumes subscription on [_sourceStream], or creates it if necessary. |
| 289 void _ensureListening() { | 299 void _ensureListening() { |
| 290 assert(!_isDone); | 300 assert(!_isDone); |
| 291 if (_subscription == null) { | 301 if (_subscription == null) { |
| 292 _subscription = | 302 _subscription = |
| 293 _sourceStream.listen(_onData, onError: _onError, onDone: _onDone); | 303 _sourceStream.listen(_onData, onError: _onError, onDone: _onDone); |
| (...skipping 316 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
| 610 _completer.complete(true); | 620 _completer.complete(true); |
| 611 return true; | 621 return true; |
| 612 } | 622 } |
| 613 return false; | 623 return false; |
| 614 } | 624 } |
| 615 | 625 |
| 616 void close(_) { | 626 void close(_) { |
| 617 _completer.complete(false); | 627 _completer.complete(false); |
| 618 } | 628 } |
| 619 } | 629 } |
| OLD | NEW |