Index: lib/src/stream_queue.dart |
diff --git a/lib/src/stream_queue.dart b/lib/src/stream_queue.dart |
index 0018710b90a313b5b9042d9bd8280db14ddc85b5..5703f0af0c036d36ba50f32784a7355f70a57a43 100644 |
--- a/lib/src/stream_queue.dart |
+++ b/lib/src/stream_queue.dart |
@@ -218,9 +218,13 @@ class StreamQueue<T> { |
/// Cancels the underlying stream subscription. |
/// |
- /// The cancel operation waits until all previously requested |
- /// events have been processed, then it cancels the subscription |
- /// providing the events. |
+ /// If [immediate] is `false` (the default), the cancel operation waits until |
+ /// all previously requested events have been processed, then it cancels the |
+ /// subscription providing the events. |
+ /// |
+ /// If [immediate] is `true`, the subscription is instead canceled |
+ /// immediately. Any pending events will complete as though the stream had |
floitsch
2015/07/09 09:47:52
Could you please rewrite the "Any pending events .
nweiz
2015/07/09 19:43:36
Can you say more about what you didn't understand?
floitsch
2015/07/13 09:39:38
Any pending events complete with a 'closed'-event,
nweiz
2015/07/13 20:07:21
Right, but what about that sentence is confusing?
|
+ /// closed when [cancel] was called. |
/// |
/// The returned future completes with the result of calling |
/// `cancel`. |
@@ -228,14 +232,21 @@ class StreamQueue<T> { |
/// After calling `cancel`, no further events can be requested. |
/// None of [next], [rest], [skip], [take] or [cancel] may be |
/// called again. |
- Future cancel() { |
- if (!_isClosed) { |
- _isClosed = true; |
+ Future cancel({bool immediate: false}) { |
+ if (_isClosed) throw _failClosed(); |
+ _isClosed = true; |
+ |
+ if (!immediate) { |
var request = new _CancelRequest(this); |
_addRequest(request); |
return request.future; |
} |
- throw _failClosed(); |
+ |
+ if (_isDone) return new Future.value(); |
+ if (_subscription == null) _subscription = _sourceStream.listen(null); |
+ var future = _subscription.cancel(); |
+ _onDone(); |
+ return future; |
} |
Lasse Reichstein Nielsen
2015/07/08 07:49:22
This is really two different functions dispatched
nweiz
2015/07/09 01:03:19
I considered that, but ended up going this directi
floitsch
2015/07/09 09:47:52
One function looks fine to me.
|
/// Returns an error for when a request is made after cancel. |
@@ -280,7 +291,6 @@ class StreamQueue<T> { |
_ensureListening(); |
} |
_requestQueue.add(request); |
- |
} |
/// Ensures that we are listening on events from [_sourceStream]. |