Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(680)

Unified Diff: lib/src/util/forkable_stream.dart

Issue 1262623006: Temporarily bring in code from the async package. (Closed) Base URL: git@github.com:dart-lang/test@master
Patch Set: Created 5 years, 5 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
Index: lib/src/util/forkable_stream.dart
diff --git a/lib/src/util/forkable_stream.dart b/lib/src/util/forkable_stream.dart
new file mode 100644
index 0000000000000000000000000000000000000000..033d49821e860782293a15588c63506780b8991d
--- /dev/null
+++ b/lib/src/util/forkable_stream.dart
@@ -0,0 +1,168 @@
+// Copyright (c) 2015, the Dart project authors. Please see the AUTHORS file
+// for details. All rights reserved. Use of this source code is governed by a
+// BSD-style license that can be found in the LICENSE file.
+
+// TODO(nweiz): Get rid of this when https://codereview.chromium.org/1241723003/
+// lands.
+library test.util.forkable_stream;
+
+import 'dart:async';
+
+import 'package:async/async.dart' hide ForkableStream;
+
+/// A single-subscription stream from which other streams may be forked off at
+/// the current position.
+///
+/// This adds an operation, [fork], which produces a new stream that
+/// independently emits the same events as this stream. Unlike the branches
+/// produced by [StreamSplitter], a fork only emits events that arrive *after*
+/// the call to [fork].
+///
+/// Each fork can be paused or canceled independently of one another and of this
+/// stream. The underlying stream will be listened to once any branch is
+/// listened to. It will be paused when all branches are paused or not yet
+/// listened to. It will be canceled when all branches have been listened to and
+/// then canceled.
+class ForkableStream<T> extends StreamView<T> {
+ /// The underlying stream.
+ final Stream _sourceStream;
+
+ /// The subscription to [_sourceStream].
+ ///
+ /// This will be `null` until this stream or any of its forks are listened to.
+ StreamSubscription _subscription;
+
+ /// Whether this has been cancelled and no more forks may be created.
+ bool _isCanceled = false;
+
+ /// The controllers for any branches that have not yet been canceled.
+ ///
+ /// This includes a controller for this stream, until that has been cancelled.
+ final _controllers = new Set<StreamController<T>>();
+
+ /// Creates a new forkable stream wrapping [sourceStream].
+ ForkableStream(Stream sourceStream)
+ // Use a completer here so that we can provide its stream to the
+ // superclass constructor while also adding the stream controller to
+ // [_controllers].
+ : this._(sourceStream, new StreamCompleter());
+
+ ForkableStream._(this._sourceStream, StreamCompleter completer)
+ : super(completer.stream) {
+ completer.setSourceStream(_fork(primary: true));
+ }
+
+ /// Creates a new fork of this stream.
+ ///
+ /// From this point forward, the fork will emit the same events as this
+ /// stream. It will *not* emit any events that have already been emitted by
+ /// this stream. The fork is independent of this stream, which means each one
+ /// may be paused or canceled without affecting the other.
+ ///
+ /// If this stream is done or its subscription has been canceled, this returns
+ /// an empty stream.
+ Stream<T> fork() => _fork(primary: false);
+
+ /// Creates a stream forwarding [_sourceStream].
+ ///
+ /// If [primary] is true, this is the stream underlying this object;
+ /// otherwise, it's a fork. The only difference is that when the primary
+ /// stream is canceled, [fork] starts throwing [StateError]s.
+ Stream<T> _fork({bool primary: false}) {
+ if (_isCanceled) {
+ var controller = new StreamController<T>()..close();
+ return controller.stream;
+ }
+
+ var controller;
+ controller = new StreamController<T>(
+ onListen: () => _onListenOrResume(controller),
+ onCancel: () => _onCancel(controller, primary: primary),
+ onPause: () => _onPause(controller),
+ onResume: () => _onListenOrResume(controller),
+ sync: true);
+
+ _controllers.add(controller);
+
+ return controller.stream;
+ }
+
+ /// The callback called when `onListen` or `onResume` is called for the branch
+ /// managed by [controller].
+ ///
+ /// This ensures that we're subscribed to [_sourceStream] and that the
+ /// subscription isn't paused.
+ void _onListenOrResume(StreamController<T> controller) {
+ if (controller.isClosed) return;
+ if (_subscription == null) {
+ _subscription =
+ _sourceStream.listen(_onData, onError: _onError, onDone: _onDone);
+ } else {
+ _subscription.resume();
+ }
+ }
+
+ /// The callback called when `onCancel` is called for the branch managed by
+ /// [controller].
+ ///
+ /// This cancels or pauses the underlying subscription as necessary. If
+ /// [primary] is true, it also ensures that future calls to [fork] throw
+ /// [StateError]s.
+ Future _onCancel(StreamController<T> controller, {bool primary: false}) {
+ if (primary) _isCanceled = true;
+
+ if (controller.isClosed) return null;
+ _controllers.remove(controller);
+
+ if (_controllers.isEmpty) return _subscription.cancel();
+
+ _onPause(controller);
+ return null;
+ }
+
+ /// The callback called when `onPause` is called for the branch managed by
+ /// [controller].
+ ///
+ /// This pauses the underlying subscription if necessary.
+ void _onPause(StreamController<T> controller) {
+ if (controller.isClosed) return;
+ if (_subscription.isPaused) return;
+ if (_controllers.any((controller) =>
+ controller.hasListener && !controller.isPaused)) {
+ return;
+ }
+
+ _subscription.pause();
+ }
+
+ /// Forwards data events to all branches.
+ void _onData(value) {
+ // Don't iterate directly over the set because [controller.add] might cause
+ // it to be modified synchronously.
+ for (var controller in _controllers.toList()) {
+ controller.add(value);
+ }
+ }
+
+ /// Forwards error events to all branches.
+ void _onError(error, StackTrace stackTrace) {
+ // Don't iterate directly over the set because [controller.addError] might
+ // cause it to be modified synchronously.
+ for (var controller in _controllers.toList()) {
+ controller.addError(error, stackTrace);
+ }
+ }
+
+ /// Forwards close events to all branches.
+ void _onDone() {
+ _isCanceled = true;
+
+ // Don't iterate directly over the set because [controller.close] might
+ // cause it to be modified synchronously.
+ for (var controller in _controllers.toList()) {
+ controller.close();
+ }
+ _controllers.clear();
+ }
+}
+

Powered by Google App Engine
This is Rietveld 408576698