Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(632)

Unified Diff: lib/src/stream_queue.dart

Issue 1241723003: Add StreamQueue.fork and ForkableStream. (Closed) Base URL: git@github.com:dart-lang/async.git@master
Patch Set: Created 5 years, 5 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
Index: lib/src/stream_queue.dart
diff --git a/lib/src/stream_queue.dart b/lib/src/stream_queue.dart
index 7d78ac593c79181bb504897f8f3b611130a3b48b..ba4041eefaa1014cead7eea1a5f56be503b0204b 100644
--- a/lib/src/stream_queue.dart
+++ b/lib/src/stream_queue.dart
@@ -7,6 +7,7 @@ library async.stream_events;
import 'dart:async';
import 'dart:collection';
+import "forkable_stream.dart";
import "subscription_stream.dart";
import "stream_completer.dart";
import "../result.dart";
@@ -78,7 +79,7 @@ class StreamQueue<T> {
// by the content of the fifth event.
/// Source of events.
- final Stream _sourceStream;
+ final ForkableStream _sourceStream;
/// Subscription on [_sourceStream] while listening for events.
///
@@ -104,7 +105,9 @@ class StreamQueue<T> {
/// Create a `StreamQueue` of the events of [source].
StreamQueue(Stream source)
- : _sourceStream = source;
+ : _sourceStream = source is ForkableStream
+ ? source
+ : new ForkableStream(source);
Lasse Reichstein Nielsen 2015/07/15 20:10:51 Would it be possible to make the fork method witho
nweiz 2015/07/15 22:19:43 I don't think this is possible without drastically
Lasse Reichstein Nielsen 2015/07/16 14:01:23 I still think this could/should be handled at a di
nweiz 2015/07/17 20:30:16 In your example, just forking a stream causes it t
/// Asks if the stream has any more events.
///
@@ -216,6 +219,22 @@ class StreamQueue<T> {
throw _failClosed();
}
+ /// Creates a new stream queue in the same position as this one.
+ ///
+ /// The fork is subscribed to the same underlying stream as this queue, but
+ /// it's otherwise wholly independent. If requests are made on one, they don't
+ /// move the other forward; if one is closed, the other is still open.
+ ///
+ /// The underlying stream will only be paused when all forks have no
+ /// outstanding requests, and only canceled when all forks are canceled.
+ StreamQueue<T> fork() {
Lasse Reichstein Nielsen 2015/07/16 14:01:23 I have another problem with fork: It may cause buf
nweiz 2015/07/17 20:30:16 I can't emphasize enough how important forking is
+ if (_isClosed) throw _failClosed();
+
+ var request = new _ForkRequest<T>(this);
+ _addRequest(request);
+ return request.queue;
+ }
+
/// Cancels the underlying stream subscription.
///
/// If [immediate] is `false` (the default), the cancel operation waits until
@@ -236,14 +255,15 @@ class StreamQueue<T> {
if (_isClosed) throw _failClosed();
_isClosed = true;
+ if (_isDone) return new Future.value();
+ if (_subscription == null) _subscription = _sourceStream.listen(null);
+
if (!immediate) {
var request = new _CancelRequest(this);
_addRequest(request);
return request.future;
}
- if (_isDone) return new Future.value();
- if (_subscription == null) _subscription = _sourceStream.listen(null);
var future = _subscription.cancel();
_onDone();
return future;
@@ -333,6 +353,7 @@ class StreamQueue<T> {
return;
}
}
+
if (!_isDone) {
_subscription.pause();
}
@@ -627,3 +648,50 @@ class _HasNextRequest<T> implements _EventRequest {
_completer.complete(false);
}
}
+
+/// Request for a [StreamQueue.fork] call.
+class _ForkRequest<T> implements _EventRequest {
+ /// Completer for the stream used by the queue by the `fork` call.
+ StreamCompleter _completer;
+
+ StreamQueue<T> queue;
+
+ /// The [StreamQueue] object that has this request queued.
+ final StreamQueue _streamQueue;
+
+ _ForkRequest(this._streamQueue) {
+ _completer = new StreamCompleter<T>();
+ queue = new StreamQueue<T>(_completer.stream);
+ }
+
+ bool addEvents(Queue<Result> events) {
+ _completeStream(events);
+ return true;
+ }
+
+ void close(Queue<Result> events) {
+ _completeStream(events);
+ }
+
+ void _completeStream(Queue<Result> events) {
+ if (events.isEmpty) {
+ if (_streamQueue._isDone) {
+ _completer.setEmpty();
+ } else {
+ _completer.setSourceStream(_streamQueue._sourceStream.fork());
+ }
+ } else {
+ // There are prefetched events which need to be added before the
+ // remaining stream.
+ var controller = new StreamController<T>();
+ for (var event in events) {
+ event.addTo(controller);
+ }
+
+ var fork = _streamQueue._sourceStream.fork();
+ controller.addStream(fork, cancelOnError: false)
+ .whenComplete(controller.close);
+ _completer.setSourceStream(controller.stream);
+ }
+ }
+}

Powered by Google App Engine
This is Rietveld 408576698