Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(110)

Unified Diff: lib/src/stream_queue.dart

Issue 1238503004: Alternative implementation of StreamQueue.fork which has no impact if it Base URL: https://github.com/dart-lang/async@master
Patch Set: Created 5 years, 5 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
« no previous file with comments | « no previous file | lib/src/subscription_fork.dart » ('j') | no next file with comments »
Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
Index: lib/src/stream_queue.dart
diff --git a/lib/src/stream_queue.dart b/lib/src/stream_queue.dart
index b8941e1510b1956b022cc6ac0f5bfbb45645b599..c31698b3bed10d1818f640fc105c6002d3e4b962 100644
--- a/lib/src/stream_queue.dart
+++ b/lib/src/stream_queue.dart
@@ -7,7 +7,7 @@ library async.stream_events;
import 'dart:async';
import 'dart:collection';
-import "forkable_stream.dart";
+import "subscription_fork.dart";
import "subscription_stream.dart";
import "stream_completer.dart";
import "../result.dart";
@@ -79,7 +79,7 @@ class StreamQueue<T> {
// by the content of the fifth event.
/// Source of events.
- final ForkableStream _sourceStream;
+ final Stream _sourceStream;
/// Subscription on [_sourceStream] while listening for events.
///
@@ -105,9 +105,7 @@ class StreamQueue<T> {
/// Create a `StreamQueue` of the events of [source].
StreamQueue(Stream source)
- : _sourceStream = source is ForkableStream
- ? source
- : new ForkableStream(source);
+ : _sourceStream = source;
/// Asks if the stream has any more events.
///
@@ -230,6 +228,13 @@ class StreamQueue<T> {
StreamQueue<T> fork() {
if (_isClosed) throw _failClosed();
+ _ensureListening();
+ if (_subscription is! ForkableSubscription) {
+ _subscription = new ForkableSubscription<T>(_subscription);
+ _subscription.onData(_onData);
+ _subscription.onError(_onError);
+ _subscription.onDone(_onDone);
+ }
var request = new _ForkRequest<T>(this);
_addRequest(request);
return request.queue;
@@ -679,7 +684,9 @@ class _ForkRequest<T> implements _EventRequest {
if (_streamQueue._isDone) {
_completer.setEmpty();
} else {
- _completer.setSourceStream(_streamQueue._sourceStream.fork());
+ assert(_streamQueue._subscription is ForkableSubscription);
+ var stream = new SubscriptionStream<T>(_streamQueue._subscription.fork());
+ _completer.setSourceStream(stream);
}
} else {
// There are prefetched events which need to be added before the
@@ -689,7 +696,7 @@ class _ForkRequest<T> implements _EventRequest {
event.addTo(controller);
}
- var fork = _streamQueue._sourceStream.fork();
+ var fork = new SubscriptionStream(_streamQueue._subscription.fork());
controller.addStream(fork, cancelOnError: false)
.whenComplete(controller.close);
_completer.setSourceStream(controller.stream);
« no previous file with comments | « no previous file | lib/src/subscription_fork.dart » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698