Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(222)

Unified Diff: packages/async/lib/src/stream_splitter.dart

Issue 1400473008: Roll Observatory packages and add a roll script (Closed) Base URL: git@github.com:dart-lang/observatory_pub_packages.git@master
Patch Set: Created 5 years, 2 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
« no previous file with comments | « packages/async/lib/src/stream_queue.dart ('k') | packages/async/lib/src/subscription_stream.dart » ('j') | no next file with comments »
Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
Index: packages/async/lib/src/stream_splitter.dart
diff --git a/packages/async/lib/src/stream_splitter.dart b/packages/async/lib/src/stream_splitter.dart
new file mode 100644
index 0000000000000000000000000000000000000000..6ec440f5dd8e5c2c83754e73020f8f34a3e1852d
--- /dev/null
+++ b/packages/async/lib/src/stream_splitter.dart
@@ -0,0 +1,213 @@
+// Copyright (c) 2015, the Dart project authors. Please see the AUTHORS file
+// for details. All rights reserved. Use of this source code is governed by a
+// BSD-style license that can be found in the LICENSE file.
+
+library async.stream_splitter;
+
+import 'dart:async';
+import 'dart:collection';
+
+import '../result.dart';
+import 'future_group.dart';
+
+/// A class that splits a single source stream into an arbitrary number of
+/// (single-subscription) streams (called "branch") that emit the same events.
+///
+/// Each branch will emit all the same values and errors as the source stream,
+/// regardless of which values have been emitted on other branches. This means
+/// that the splitter stores every event that has been emitted so far, which may
+/// consume a lot of memory. The user can call [close] to indicate that no more
+/// branches will be created, and this memory will be released.
+///
+/// The source stream is only listened to once a branch is created *and listened
+/// to*. It's paused when all branches are paused *or when all branches are
+/// canceled*, and resumed once there's at least one branch that's listening and
+/// unpaused. It's not canceled unless no branches are listening and [close] has
+/// been called.
+class StreamSplitter<T> {
+ /// The wrapped stream.
+ final Stream<T> _stream;
+
+ /// The subscription to [_stream].
+ ///
+ /// This will be `null` until a branch has a listener.
+ StreamSubscription<T> _subscription;
+
+ /// The buffer of events or errors that have already been emitted by
+ /// [_stream].
+ final _buffer = new List<Result<T>>();
+
+ /// The controllers for branches that are listening for future events from
+ /// [_stream].
+ ///
+ /// Once a branch is canceled, it's removed from this list. When [_stream] is
+ /// done, all branches are removed.
+ final _controllers = new Set<StreamController<T>>();
+
+ /// A group of futures returned by [close].
+ ///
+ /// This is used to ensure that [close] doesn't complete until all
+ /// [StreamController.close] and [StreamSubscription.cancel] calls complete.
+ final _closeGroup = new FutureGroup();
+
+ /// Whether [_stream] is done emitting events.
+ var _isDone = false;
+
+ /// Whether [close] has been called.
+ var _isClosed = false;
+
+ /// Splits [stream] into [count] identical streams.
+ ///
+ /// [count] defaults to 2. This is the same as creating [count] branches and
+ /// then closing the [StreamSplitter].
+ static List<Stream> splitFrom(Stream stream, [int count]) {
+ if (count == null) count = 2;
+ var splitter = new StreamSplitter(stream);
+ var streams = new List.generate(count, (_) => splitter.split());
+ splitter.close();
+ return streams;
+ }
+
+ StreamSplitter(this._stream);
+
+ /// Returns a single-subscription stream that's a copy of the input stream.
+ ///
+ /// This will throw a [StateError] if [close] has been called.
+ Stream<T> split() {
+ if (_isClosed) {
+ throw new StateError("Can't call split() on a closed StreamSplitter.");
+ }
+
+ var controller;
+ controller = new StreamController<T>(
+ onListen: _onListen,
+ onPause: _onPause,
+ onResume: _onResume,
+ onCancel: () => _onCancel(controller));
+
+ for (var result in _buffer) {
+ result.addTo(controller);
+ }
+
+ if (_isDone) {
+ _closeGroup.add(controller.close());
+ } else {
+ _controllers.add(controller);
+ }
+
+ return controller.stream;
+ }
+
+ /// Indicates that no more branches will be requested via [split].
+ ///
+ /// This clears the internal buffer of events. If there are no branches or all
+ /// branches have been canceled, this cancels the subscription to the input
+ /// stream.
+ ///
+ /// Returns a [Future] that completes once all events have been processed by
+ /// all branches and (if applicable) the subscription to the input stream has
+ /// been canceled.
+ Future close() {
+ if (_isClosed) return _closeGroup.future;
+ _isClosed = true;
+
+ _buffer.clear();
+ if (_controllers.isEmpty) _cancelSubscription();
+
+ return _closeGroup.future;
+ }
+
+ /// Cancel [_subscription] and close [_closeGroup].
+ ///
+ /// This should be called after all the branches' subscriptions have been
+ /// canceled and the splitter has been closed. In that case, we won't use the
+ /// events from [_subscription] any more, since there's nothing to pipe them
+ /// to and no more branches will be created. If [_subscription] is done,
+ /// canceling it will be a no-op.
+ ///
+ /// This may also be called before any branches have been created, in which
+ /// case [_subscription] will be `null`.
+ void _cancelSubscription() {
+ assert(_controllers.isEmpty);
+ assert(_isClosed);
+
+ var future = null;
+ if (_subscription != null) future = _subscription.cancel();
+ if (future != null) _closeGroup.add(future);
+ _closeGroup.close();
+ }
+
+ // StreamController events
+
+ /// Subscribe to [_stream] if we haven't yet done so, and resume the
+ /// subscription if we have.
+ void _onListen() {
+ if (_isDone) return;
+
+ if (_subscription != null) {
+ // Resume the subscription in case it was paused, either because all the
+ // controllers were paused or because the last one was canceled. If it
+ // wasn't paused, this will be a no-op.
+ _subscription.resume();
+ } else {
+ _subscription = _stream.listen(
+ _onData, onError: _onError, onDone: _onDone);
+ }
+ }
+
+ /// Pauses [_subscription] if every controller is paused.
+ void _onPause() {
+ if (!_controllers.every((controller) => controller.isPaused)) return;
+ _subscription.pause();
+ }
+
+ /// Resumes [_subscription].
+ ///
+ /// If [_subscription] wasn't paused, this is a no-op.
+ void _onResume() {
+ _subscription.resume();
+ }
+
+ /// Removes [controller] from [_controllers] and cancels or pauses
+ /// [_subscription] as appropriate.
+ ///
+ /// Since the controller emitting a done event will cause it to register as
+ /// canceled, this is the only way that a controller is ever removed from
+ /// [_controllers].
+ void _onCancel(StreamController controller) {
+ _controllers.remove(controller);
+ if (_controllers.isNotEmpty) return;
+
+ if (_isClosed) {
+ _cancelSubscription();
+ } else {
+ _subscription.pause();
+ }
+ }
+
+ // Stream events
+
+ /// Buffers [data] and passes it to [_controllers].
+ void _onData(T data) {
+ if (!_isClosed) _buffer.add(new Result.value(data));
+ for (var controller in _controllers) {
+ controller.add(data);
+ }
+ }
+
+ /// Buffers [error] and passes it to [_controllers].
+ void _onError(Object error, StackTrace stackTrace) {
+ if (!_isClosed) _buffer.add(new Result.error(error, stackTrace));
+ for (var controller in _controllers) {
+ controller.addError(error, stackTrace);
+ }
+ }
+
+ /// Marks [_controllers] as done.
+ void _onDone() {
+ _isDone = true;
+ for (var controller in _controllers) {
+ _closeGroup.add(controller.close());
+ }
+ }
+}
« no previous file with comments | « packages/async/lib/src/stream_queue.dart ('k') | packages/async/lib/src/subscription_stream.dart » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698