Index: lib/src/stream_queue.dart |
diff --git a/lib/src/stream_queue.dart b/lib/src/stream_queue.dart |
index 7d78ac593c79181bb504897f8f3b611130a3b48b..ba4041eefaa1014cead7eea1a5f56be503b0204b 100644 |
--- a/lib/src/stream_queue.dart |
+++ b/lib/src/stream_queue.dart |
@@ -7,6 +7,7 @@ library async.stream_events; |
import 'dart:async'; |
import 'dart:collection'; |
+import "forkable_stream.dart"; |
import "subscription_stream.dart"; |
import "stream_completer.dart"; |
import "../result.dart"; |
@@ -78,7 +79,7 @@ class StreamQueue<T> { |
// by the content of the fifth event. |
/// Source of events. |
- final Stream _sourceStream; |
+ final ForkableStream _sourceStream; |
/// Subscription on [_sourceStream] while listening for events. |
/// |
@@ -104,7 +105,9 @@ class StreamQueue<T> { |
/// Create a `StreamQueue` of the events of [source]. |
StreamQueue(Stream source) |
- : _sourceStream = source; |
+ : _sourceStream = source is ForkableStream |
+ ? source |
+ : new ForkableStream(source); |
Lasse Reichstein Nielsen
2015/07/15 20:10:51
Would it be possible to make the fork method witho
nweiz
2015/07/15 22:19:43
I don't think this is possible without drastically
Lasse Reichstein Nielsen
2015/07/16 14:01:23
I still think this could/should be handled at a di
nweiz
2015/07/17 20:30:16
In your example, just forking a stream causes it t
|
/// Asks if the stream has any more events. |
/// |
@@ -216,6 +219,22 @@ class StreamQueue<T> { |
throw _failClosed(); |
} |
+ /// Creates a new stream queue in the same position as this one. |
+ /// |
+ /// The fork is subscribed to the same underlying stream as this queue, but |
+ /// it's otherwise wholly independent. If requests are made on one, they don't |
+ /// move the other forward; if one is closed, the other is still open. |
+ /// |
+ /// The underlying stream will only be paused when all forks have no |
+ /// outstanding requests, and only canceled when all forks are canceled. |
+ StreamQueue<T> fork() { |
Lasse Reichstein Nielsen
2015/07/16 14:01:23
I have another problem with fork: It may cause buf
nweiz
2015/07/17 20:30:16
I can't emphasize enough how important forking is
|
+ if (_isClosed) throw _failClosed(); |
+ |
+ var request = new _ForkRequest<T>(this); |
+ _addRequest(request); |
+ return request.queue; |
+ } |
+ |
/// Cancels the underlying stream subscription. |
/// |
/// If [immediate] is `false` (the default), the cancel operation waits until |
@@ -236,14 +255,15 @@ class StreamQueue<T> { |
if (_isClosed) throw _failClosed(); |
_isClosed = true; |
+ if (_isDone) return new Future.value(); |
+ if (_subscription == null) _subscription = _sourceStream.listen(null); |
+ |
if (!immediate) { |
var request = new _CancelRequest(this); |
_addRequest(request); |
return request.future; |
} |
- if (_isDone) return new Future.value(); |
- if (_subscription == null) _subscription = _sourceStream.listen(null); |
var future = _subscription.cancel(); |
_onDone(); |
return future; |
@@ -333,6 +353,7 @@ class StreamQueue<T> { |
return; |
} |
} |
+ |
if (!_isDone) { |
_subscription.pause(); |
} |
@@ -627,3 +648,50 @@ class _HasNextRequest<T> implements _EventRequest { |
_completer.complete(false); |
} |
} |
+ |
+/// Request for a [StreamQueue.fork] call. |
+class _ForkRequest<T> implements _EventRequest { |
+ /// Completer for the stream used by the queue by the `fork` call. |
+ StreamCompleter _completer; |
+ |
+ StreamQueue<T> queue; |
+ |
+ /// The [StreamQueue] object that has this request queued. |
+ final StreamQueue _streamQueue; |
+ |
+ _ForkRequest(this._streamQueue) { |
+ _completer = new StreamCompleter<T>(); |
+ queue = new StreamQueue<T>(_completer.stream); |
+ } |
+ |
+ bool addEvents(Queue<Result> events) { |
+ _completeStream(events); |
+ return true; |
+ } |
+ |
+ void close(Queue<Result> events) { |
+ _completeStream(events); |
+ } |
+ |
+ void _completeStream(Queue<Result> events) { |
+ if (events.isEmpty) { |
+ if (_streamQueue._isDone) { |
+ _completer.setEmpty(); |
+ } else { |
+ _completer.setSourceStream(_streamQueue._sourceStream.fork()); |
+ } |
+ } else { |
+ // There are prefetched events which need to be added before the |
+ // remaining stream. |
+ var controller = new StreamController<T>(); |
+ for (var event in events) { |
+ event.addTo(controller); |
+ } |
+ |
+ var fork = _streamQueue._sourceStream.fork(); |
+ controller.addStream(fork, cancelOnError: false) |
+ .whenComplete(controller.close); |
+ _completer.setSourceStream(controller.stream); |
+ } |
+ } |
+} |