OLD | NEW |
1 // Copyright (c) 2015, the Dart project authors. Please see the AUTHORS file | 1 // Copyright (c) 2015, the Dart project authors. Please see the AUTHORS file |
2 // for details. All rights reserved. Use of this source code is governed by a | 2 // for details. All rights reserved. Use of this source code is governed by a |
3 // BSD-style license that can be found in the LICENSE file. | 3 // BSD-style license that can be found in the LICENSE file. |
4 | 4 |
5 library async.stream_events; | 5 library async.stream_events; |
6 | 6 |
7 import 'dart:async'; | 7 import 'dart:async'; |
8 import 'dart:collection'; | 8 import 'dart:collection'; |
9 | 9 |
10 import "subscription_stream.dart"; | 10 import "subscription_stream.dart"; |
(...skipping 73 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
84 /// | 84 /// |
85 /// Set to subscription when listening, and set to `null` when the | 85 /// Set to subscription when listening, and set to `null` when the |
86 /// subscription is done (and [_isDone] is set to true). | 86 /// subscription is done (and [_isDone] is set to true). |
87 StreamSubscription _subscription; | 87 StreamSubscription _subscription; |
88 | 88 |
89 /// Whether we have listened on [_sourceStream] and the subscription is done. | 89 /// Whether we have listened on [_sourceStream] and the subscription is done. |
90 bool _isDone = false; | 90 bool _isDone = false; |
91 | 91 |
92 /// Whether a closing operation has been performed on the stream queue. | 92 /// Whether a closing operation has been performed on the stream queue. |
93 /// | 93 /// |
94 /// Closing operations are [cancel], [cancelImmediately], and [rest]. | 94 /// Closing operations are [cancel] and [rest]. |
95 bool _isClosed = false; | 95 bool _isClosed = false; |
96 | 96 |
97 /// Queue of events not used by a request yet. | 97 /// Queue of events not used by a request yet. |
98 final Queue<Result> _eventQueue = new Queue(); | 98 final Queue<Result> _eventQueue = new Queue(); |
99 | 99 |
100 /// Queue of pending requests. | 100 /// Queue of pending requests. |
101 /// | 101 /// |
102 /// Access through methods below to ensure consistency. | 102 /// Access through methods below to ensure consistency. |
103 final Queue<_EventRequest> _requestQueue = new Queue(); | 103 final Queue<_EventRequest> _requestQueue = new Queue(); |
104 | 104 |
(...skipping 106 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
211 if (!_isClosed) { | 211 if (!_isClosed) { |
212 var request = new _TakeRequest<T>(count); | 212 var request = new _TakeRequest<T>(count); |
213 _addRequest(request); | 213 _addRequest(request); |
214 return request.future; | 214 return request.future; |
215 } | 215 } |
216 throw _failClosed(); | 216 throw _failClosed(); |
217 } | 217 } |
218 | 218 |
219 /// Cancels the underlying stream subscription. | 219 /// Cancels the underlying stream subscription. |
220 /// | 220 /// |
221 /// The cancel operation waits until all previously requested | 221 /// If [immediate] is `false` (the default), the cancel operation waits until |
222 /// events have been processed, then it cancels the subscription | 222 /// all previously requested events have been processed, then it cancels the |
223 /// providing the events. | 223 /// subscription providing the events. |
| 224 /// |
| 225 /// If [immediate] is `true`, the subscription is instead canceled |
| 226 /// immediately. Any pending events are completed as though the underlying |
| 227 /// stream had closed. |
224 /// | 228 /// |
225 /// The returned future completes with the result of calling | 229 /// The returned future completes with the result of calling |
226 /// `cancel`. | 230 /// `cancel`. |
227 /// | 231 /// |
228 /// After calling `cancel`, no further events can be requested. | 232 /// After calling `cancel`, no further events can be requested. |
229 /// None of [next], [rest], [skip], [take] or [cancel] may be | 233 /// None of [next], [rest], [skip], [take] or [cancel] may be |
230 /// called again. | 234 /// called again. |
231 Future cancel() { | 235 Future cancel({bool immediate: false}) { |
232 if (!_isClosed) { | 236 if (_isClosed) throw _failClosed(); |
233 _isClosed = true; | 237 _isClosed = true; |
| 238 |
| 239 if (!immediate) { |
234 var request = new _CancelRequest(this); | 240 var request = new _CancelRequest(this); |
235 _addRequest(request); | 241 _addRequest(request); |
236 return request.future; | 242 return request.future; |
237 } | 243 } |
238 throw _failClosed(); | |
239 } | |
240 | |
241 /// Cancels the underlying stream subscription immediately. | |
242 /// | |
243 /// Any pending events will complete as though the stream had closed when | |
244 /// [cancel] was called. | |
245 /// | |
246 /// The returned future completes with the result of calling | |
247 /// `StreamSubscription.cancel`. | |
248 /// | |
249 /// After calling `cancelImmediately`, no further events can be requested. | |
250 /// None of [next], [rest], [skip], [take] or [cancel] may be called again. | |
251 Future cancelImmediately() { | |
252 if (_isClosed) throw _failClosed(); | |
253 _isClosed = true; | |
254 | 244 |
255 if (_isDone) return new Future.value(); | 245 if (_isDone) return new Future.value(); |
256 if (_subscription == null) _subscription = _sourceStream.listen(null); | 246 if (_subscription == null) _subscription = _sourceStream.listen(null); |
257 var future = _subscription.cancel(); | 247 var future = _subscription.cancel(); |
258 _onDone(); | 248 _onDone(); |
259 return future; | 249 return future; |
260 } | 250 } |
261 | 251 |
262 /// Returns an error for when a request is made after cancel. | 252 /// Returns an error for when a request is made after cancel. |
263 /// | 253 /// |
264 /// Returns a [StateError] with a message saying that either | 254 /// Returns a [StateError] with a message saying that either |
265 /// [cancel], [cancelImmediately], or [rest] have already been called. | 255 /// [cancel] or [rest] have already been called. |
266 Error _failClosed() { | 256 Error _failClosed() { |
267 return new StateError("Already cancelled"); | 257 return new StateError("Already cancelled"); |
268 } | 258 } |
269 | 259 |
270 // Callbacks receiving the events of the source stream. | 260 // Callbacks receiving the events of the source stream. |
271 | 261 |
272 void _onData(T data) { | 262 void _onData(T data) { |
273 _eventQueue.add(new Result.value(data)); | 263 _eventQueue.add(new Result.value(data)); |
274 _checkQueues(); | 264 _checkQueues(); |
275 } | 265 } |
(...skipping 354 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
630 _completer.complete(true); | 620 _completer.complete(true); |
631 return true; | 621 return true; |
632 } | 622 } |
633 return false; | 623 return false; |
634 } | 624 } |
635 | 625 |
636 void close(_) { | 626 void close(_) { |
637 _completer.complete(false); | 627 _completer.complete(false); |
638 } | 628 } |
639 } | 629 } |
OLD | NEW |