| Index: lib/src/stream_queue.dart
|
| diff --git a/lib/src/stream_queue.dart b/lib/src/stream_queue.dart
|
| index b8941e1510b1956b022cc6ac0f5bfbb45645b599..c31698b3bed10d1818f640fc105c6002d3e4b962 100644
|
| --- a/lib/src/stream_queue.dart
|
| +++ b/lib/src/stream_queue.dart
|
| @@ -7,7 +7,7 @@ library async.stream_events;
|
| import 'dart:async';
|
| import 'dart:collection';
|
|
|
| -import "forkable_stream.dart";
|
| +import "subscription_fork.dart";
|
| import "subscription_stream.dart";
|
| import "stream_completer.dart";
|
| import "../result.dart";
|
| @@ -79,7 +79,7 @@ class StreamQueue<T> {
|
| // by the content of the fifth event.
|
|
|
| /// Source of events.
|
| - final ForkableStream _sourceStream;
|
| + final Stream _sourceStream;
|
|
|
| /// Subscription on [_sourceStream] while listening for events.
|
| ///
|
| @@ -105,9 +105,7 @@ class StreamQueue<T> {
|
|
|
| /// Create a `StreamQueue` of the events of [source].
|
| StreamQueue(Stream source)
|
| - : _sourceStream = source is ForkableStream
|
| - ? source
|
| - : new ForkableStream(source);
|
| + : _sourceStream = source;
|
|
|
| /// Asks if the stream has any more events.
|
| ///
|
| @@ -230,6 +228,13 @@ class StreamQueue<T> {
|
| StreamQueue<T> fork() {
|
| if (_isClosed) throw _failClosed();
|
|
|
| + _ensureListening();
|
| + if (_subscription is! ForkableSubscription) {
|
| + _subscription = new ForkableSubscription<T>(_subscription);
|
| + _subscription.onData(_onData);
|
| + _subscription.onError(_onError);
|
| + _subscription.onDone(_onDone);
|
| + }
|
| var request = new _ForkRequest<T>(this);
|
| _addRequest(request);
|
| return request.queue;
|
| @@ -679,7 +684,9 @@ class _ForkRequest<T> implements _EventRequest {
|
| if (_streamQueue._isDone) {
|
| _completer.setEmpty();
|
| } else {
|
| - _completer.setSourceStream(_streamQueue._sourceStream.fork());
|
| + assert(_streamQueue._subscription is ForkableSubscription);
|
| + var stream = new SubscriptionStream<T>(_streamQueue._subscription.fork());
|
| + _completer.setSourceStream(stream);
|
| }
|
| } else {
|
| // There are prefetched events which need to be added before the
|
| @@ -689,7 +696,7 @@ class _ForkRequest<T> implements _EventRequest {
|
| event.addTo(controller);
|
| }
|
|
|
| - var fork = _streamQueue._sourceStream.fork();
|
| + var fork = new SubscriptionStream(_streamQueue._subscription.fork());
|
| controller.addStream(fork, cancelOnError: false)
|
| .whenComplete(controller.close);
|
| _completer.setSourceStream(controller.stream);
|
|
|