Index: lib/src/stream_queue.dart |
diff --git a/lib/src/stream_queue.dart b/lib/src/stream_queue.dart |
index b8941e1510b1956b022cc6ac0f5bfbb45645b599..c31698b3bed10d1818f640fc105c6002d3e4b962 100644 |
--- a/lib/src/stream_queue.dart |
+++ b/lib/src/stream_queue.dart |
@@ -7,7 +7,7 @@ library async.stream_events; |
import 'dart:async'; |
import 'dart:collection'; |
-import "forkable_stream.dart"; |
+import "subscription_fork.dart"; |
import "subscription_stream.dart"; |
import "stream_completer.dart"; |
import "../result.dart"; |
@@ -79,7 +79,7 @@ class StreamQueue<T> { |
// by the content of the fifth event. |
/// Source of events. |
- final ForkableStream _sourceStream; |
+ final Stream _sourceStream; |
/// Subscription on [_sourceStream] while listening for events. |
/// |
@@ -105,9 +105,7 @@ class StreamQueue<T> { |
/// Create a `StreamQueue` of the events of [source]. |
StreamQueue(Stream source) |
- : _sourceStream = source is ForkableStream |
- ? source |
- : new ForkableStream(source); |
+ : _sourceStream = source; |
/// Asks if the stream has any more events. |
/// |
@@ -230,6 +228,13 @@ class StreamQueue<T> { |
StreamQueue<T> fork() { |
if (_isClosed) throw _failClosed(); |
+ _ensureListening(); |
+ if (_subscription is! ForkableSubscription) { |
+ _subscription = new ForkableSubscription<T>(_subscription); |
+ _subscription.onData(_onData); |
+ _subscription.onError(_onError); |
+ _subscription.onDone(_onDone); |
+ } |
var request = new _ForkRequest<T>(this); |
_addRequest(request); |
return request.queue; |
@@ -679,7 +684,9 @@ class _ForkRequest<T> implements _EventRequest { |
if (_streamQueue._isDone) { |
_completer.setEmpty(); |
} else { |
- _completer.setSourceStream(_streamQueue._sourceStream.fork()); |
+ assert(_streamQueue._subscription is ForkableSubscription); |
+ var stream = new SubscriptionStream<T>(_streamQueue._subscription.fork()); |
+ _completer.setSourceStream(stream); |
} |
} else { |
// There are prefetched events which need to be added before the |
@@ -689,7 +696,7 @@ class _ForkRequest<T> implements _EventRequest { |
event.addTo(controller); |
} |
- var fork = _streamQueue._sourceStream.fork(); |
+ var fork = new SubscriptionStream(_streamQueue._subscription.fork()); |
controller.addStream(fork, cancelOnError: false) |
.whenComplete(controller.close); |
_completer.setSourceStream(controller.stream); |