Chromium Code Reviews| Index: lib/src/stream_queue.dart |
| diff --git a/lib/src/stream_queue.dart b/lib/src/stream_queue.dart |
| index 7d78ac593c79181bb504897f8f3b611130a3b48b..ba4041eefaa1014cead7eea1a5f56be503b0204b 100644 |
| --- a/lib/src/stream_queue.dart |
| +++ b/lib/src/stream_queue.dart |
| @@ -7,6 +7,7 @@ library async.stream_events; |
| import 'dart:async'; |
| import 'dart:collection'; |
| +import "forkable_stream.dart"; |
| import "subscription_stream.dart"; |
| import "stream_completer.dart"; |
| import "../result.dart"; |
| @@ -78,7 +79,7 @@ class StreamQueue<T> { |
| // by the content of the fifth event. |
| /// Source of events. |
| - final Stream _sourceStream; |
| + final ForkableStream _sourceStream; |
| /// Subscription on [_sourceStream] while listening for events. |
| /// |
| @@ -104,7 +105,9 @@ class StreamQueue<T> { |
| /// Create a `StreamQueue` of the events of [source]. |
| StreamQueue(Stream source) |
| - : _sourceStream = source; |
| + : _sourceStream = source is ForkableStream |
| + ? source |
| + : new ForkableStream(source); |
|
Lasse Reichstein Nielsen
2015/07/15 20:10:51
Would it be possible to make the fork method witho
nweiz
2015/07/15 22:19:43
I don't think this is possible without drastically
Lasse Reichstein Nielsen
2015/07/16 14:01:23
I still think this could/should be handled at a di
nweiz
2015/07/17 20:30:16
In your example, just forking a stream causes it t
|
| /// Asks if the stream has any more events. |
| /// |
| @@ -216,6 +219,22 @@ class StreamQueue<T> { |
| throw _failClosed(); |
| } |
| + /// Creates a new stream queue in the same position as this one. |
| + /// |
| + /// The fork is subscribed to the same underlying stream as this queue, but |
| + /// it's otherwise wholly independent. If requests are made on one, they don't |
| + /// move the other forward; if one is closed, the other is still open. |
| + /// |
| + /// The underlying stream will only be paused when all forks have no |
| + /// outstanding requests, and only canceled when all forks are canceled. |
| + StreamQueue<T> fork() { |
|
Lasse Reichstein Nielsen
2015/07/16 14:01:23
I have another problem with fork: It may cause buf
nweiz
2015/07/17 20:30:16
I can't emphasize enough how important forking is
|
| + if (_isClosed) throw _failClosed(); |
| + |
| + var request = new _ForkRequest<T>(this); |
| + _addRequest(request); |
| + return request.queue; |
| + } |
| + |
| /// Cancels the underlying stream subscription. |
| /// |
| /// If [immediate] is `false` (the default), the cancel operation waits until |
| @@ -236,14 +255,15 @@ class StreamQueue<T> { |
| if (_isClosed) throw _failClosed(); |
| _isClosed = true; |
| + if (_isDone) return new Future.value(); |
| + if (_subscription == null) _subscription = _sourceStream.listen(null); |
| + |
| if (!immediate) { |
| var request = new _CancelRequest(this); |
| _addRequest(request); |
| return request.future; |
| } |
| - if (_isDone) return new Future.value(); |
| - if (_subscription == null) _subscription = _sourceStream.listen(null); |
| var future = _subscription.cancel(); |
| _onDone(); |
| return future; |
| @@ -333,6 +353,7 @@ class StreamQueue<T> { |
| return; |
| } |
| } |
| + |
| if (!_isDone) { |
| _subscription.pause(); |
| } |
| @@ -627,3 +648,50 @@ class _HasNextRequest<T> implements _EventRequest { |
| _completer.complete(false); |
| } |
| } |
| + |
| +/// Request for a [StreamQueue.fork] call. |
| +class _ForkRequest<T> implements _EventRequest { |
| + /// Completer for the stream used by the queue by the `fork` call. |
| + StreamCompleter _completer; |
| + |
| + StreamQueue<T> queue; |
| + |
| + /// The [StreamQueue] object that has this request queued. |
| + final StreamQueue _streamQueue; |
| + |
| + _ForkRequest(this._streamQueue) { |
| + _completer = new StreamCompleter<T>(); |
| + queue = new StreamQueue<T>(_completer.stream); |
| + } |
| + |
| + bool addEvents(Queue<Result> events) { |
| + _completeStream(events); |
| + return true; |
| + } |
| + |
| + void close(Queue<Result> events) { |
| + _completeStream(events); |
| + } |
| + |
| + void _completeStream(Queue<Result> events) { |
| + if (events.isEmpty) { |
| + if (_streamQueue._isDone) { |
| + _completer.setEmpty(); |
| + } else { |
| + _completer.setSourceStream(_streamQueue._sourceStream.fork()); |
| + } |
| + } else { |
| + // There are prefetched events which need to be added before the |
| + // remaining stream. |
| + var controller = new StreamController<T>(); |
| + for (var event in events) { |
| + event.addTo(controller); |
| + } |
| + |
| + var fork = _streamQueue._sourceStream.fork(); |
| + controller.addStream(fork, cancelOnError: false) |
| + .whenComplete(controller.close); |
| + _completer.setSourceStream(controller.stream); |
| + } |
| + } |
| +} |