| OLD | NEW |
| 1 // Copyright (c) 2015, the Dart project authors. Please see the AUTHORS file | 1 // Copyright (c) 2015, the Dart project authors. Please see the AUTHORS file |
| 2 // for details. All rights reserved. Use of this source code is governed by a | 2 // for details. All rights reserved. Use of this source code is governed by a |
| 3 // BSD-style license that can be found in the LICENSE file. | 3 // BSD-style license that can be found in the LICENSE file. |
| 4 | 4 |
| 5 library async.stream_events; | 5 library async.stream_events; |
| 6 | 6 |
| 7 import 'dart:async'; | 7 import 'dart:async'; |
| 8 import 'dart:collection'; | 8 import 'dart:collection'; |
| 9 | 9 |
| 10 import "subscription_stream.dart"; | 10 import "subscription_stream.dart"; |
| (...skipping 73 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 84 /// | 84 /// |
| 85 /// Set to subscription when listening, and set to `null` when the | 85 /// Set to subscription when listening, and set to `null` when the |
| 86 /// subscription is done (and [_isDone] is set to true). | 86 /// subscription is done (and [_isDone] is set to true). |
| 87 StreamSubscription _subscription; | 87 StreamSubscription _subscription; |
| 88 | 88 |
| 89 /// Whether we have listened on [_sourceStream] and the subscription is done. | 89 /// Whether we have listened on [_sourceStream] and the subscription is done. |
| 90 bool _isDone = false; | 90 bool _isDone = false; |
| 91 | 91 |
| 92 /// Whether a closing operation has been performed on the stream queue. | 92 /// Whether a closing operation has been performed on the stream queue. |
| 93 /// | 93 /// |
| 94 /// Closing operations are [cancel], [cancelImmediately], and [rest]. | 94 /// Closing operations are [cancel] and [rest]. |
| 95 bool _isClosed = false; | 95 bool _isClosed = false; |
| 96 | 96 |
| 97 /// Queue of events not used by a request yet. | 97 /// Queue of events not used by a request yet. |
| 98 final Queue<Result> _eventQueue = new Queue(); | 98 final Queue<Result> _eventQueue = new Queue(); |
| 99 | 99 |
| 100 /// Queue of pending requests. | 100 /// Queue of pending requests. |
| 101 /// | 101 /// |
| 102 /// Access through methods below to ensure consistency. | 102 /// Access through methods below to ensure consistency. |
| 103 final Queue<_EventRequest> _requestQueue = new Queue(); | 103 final Queue<_EventRequest> _requestQueue = new Queue(); |
| 104 | 104 |
| (...skipping 106 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 211 if (!_isClosed) { | 211 if (!_isClosed) { |
| 212 var request = new _TakeRequest<T>(count); | 212 var request = new _TakeRequest<T>(count); |
| 213 _addRequest(request); | 213 _addRequest(request); |
| 214 return request.future; | 214 return request.future; |
| 215 } | 215 } |
| 216 throw _failClosed(); | 216 throw _failClosed(); |
| 217 } | 217 } |
| 218 | 218 |
| 219 /// Cancels the underlying stream subscription. | 219 /// Cancels the underlying stream subscription. |
| 220 /// | 220 /// |
| 221 /// The cancel operation waits until all previously requested | 221 /// If [immediate] is `false` (the default), the cancel operation waits until |
| 222 /// events have been processed, then it cancels the subscription | 222 /// all previously requested events have been processed, then it cancels the |
| 223 /// providing the events. | 223 /// subscription providing the events. |
| 224 /// |
| 225 /// If [immediate] is `true`, the subscription is instead canceled |
| 226 /// immediately. Any pending events are completed as though the underlying |
| 227 /// stream had closed. |
| 224 /// | 228 /// |
| 225 /// The returned future completes with the result of calling | 229 /// The returned future completes with the result of calling |
| 226 /// `cancel`. | 230 /// `cancel`. |
| 227 /// | 231 /// |
| 228 /// After calling `cancel`, no further events can be requested. | 232 /// After calling `cancel`, no further events can be requested. |
| 229 /// None of [next], [rest], [skip], [take] or [cancel] may be | 233 /// None of [next], [rest], [skip], [take] or [cancel] may be |
| 230 /// called again. | 234 /// called again. |
| 231 Future cancel() { | 235 Future cancel({bool immediate: false}) { |
| 232 if (!_isClosed) { | 236 if (_isClosed) throw _failClosed(); |
| 233 _isClosed = true; | 237 _isClosed = true; |
| 238 |
| 239 if (!immediate) { |
| 234 var request = new _CancelRequest(this); | 240 var request = new _CancelRequest(this); |
| 235 _addRequest(request); | 241 _addRequest(request); |
| 236 return request.future; | 242 return request.future; |
| 237 } | 243 } |
| 238 throw _failClosed(); | |
| 239 } | |
| 240 | |
| 241 /// Cancels the underlying stream subscription immediately. | |
| 242 /// | |
| 243 /// Any pending events will complete as though the stream had closed when | |
| 244 /// [cancel] was called. | |
| 245 /// | |
| 246 /// The returned future completes with the result of calling | |
| 247 /// `StreamSubscription.cancel`. | |
| 248 /// | |
| 249 /// After calling `cancelImmediately`, no further events can be requested. | |
| 250 /// None of [next], [rest], [skip], [take] or [cancel] may be called again. | |
| 251 Future cancelImmediately() { | |
| 252 if (_isClosed) throw _failClosed(); | |
| 253 _isClosed = true; | |
| 254 | 244 |
| 255 if (_isDone) return new Future.value(); | 245 if (_isDone) return new Future.value(); |
| 256 if (_subscription == null) _subscription = _sourceStream.listen(null); | 246 if (_subscription == null) _subscription = _sourceStream.listen(null); |
| 257 var future = _subscription.cancel(); | 247 var future = _subscription.cancel(); |
| 258 _onDone(); | 248 _onDone(); |
| 259 return future; | 249 return future; |
| 260 } | 250 } |
| 261 | 251 |
| 262 /// Returns an error for when a request is made after cancel. | 252 /// Returns an error for when a request is made after cancel. |
| 263 /// | 253 /// |
| 264 /// Returns a [StateError] with a message saying that either | 254 /// Returns a [StateError] with a message saying that either |
| 265 /// [cancel], [cancelImmediately], or [rest] have already been called. | 255 /// [cancel] or [rest] have already been called. |
| 266 Error _failClosed() { | 256 Error _failClosed() { |
| 267 return new StateError("Already cancelled"); | 257 return new StateError("Already cancelled"); |
| 268 } | 258 } |
| 269 | 259 |
| 270 // Callbacks receiving the events of the source stream. | 260 // Callbacks receiving the events of the source stream. |
| 271 | 261 |
| 272 void _onData(T data) { | 262 void _onData(T data) { |
| 273 _eventQueue.add(new Result.value(data)); | 263 _eventQueue.add(new Result.value(data)); |
| 274 _checkQueues(); | 264 _checkQueues(); |
| 275 } | 265 } |
| (...skipping 354 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 630 _completer.complete(true); | 620 _completer.complete(true); |
| 631 return true; | 621 return true; |
| 632 } | 622 } |
| 633 return false; | 623 return false; |
| 634 } | 624 } |
| 635 | 625 |
| 636 void close(_) { | 626 void close(_) { |
| 637 _completer.complete(false); | 627 _completer.complete(false); |
| 638 } | 628 } |
| 639 } | 629 } |
| OLD | NEW |