Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(3)

Unified Diff: lib/src/stream_group.dart

Issue 1178793006: Add a StreamGroup class for merging streams. (Closed) Base URL: git@github.com:dart-lang/async.git@master
Patch Set: Created 5 years, 6 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
Index: lib/src/stream_group.dart
diff --git a/lib/src/stream_group.dart b/lib/src/stream_group.dart
new file mode 100644
index 0000000000000000000000000000000000000000..d1b51c183c073c89ce632157b8431620625e8518
--- /dev/null
+++ b/lib/src/stream_group.dart
@@ -0,0 +1,269 @@
+// Copyright (c) 2015, the Dart project authors. Please see the AUTHORS file
+// for details. All rights reserved. Use of this source code is governed by a
+// BSD-style license that can be found in the LICENSE file.
+
+library async.stream_group;
+
+import 'dart:async';
+
+/// A collection of streams whose events are unified and sent through a central
+/// stream.
+///
+/// Both errors and data events are forwarded through [stream]. The streams in
+/// the group won't be listened to until [stream] has a listener. **Note that
+/// this means that events emitted by broadcast streams will be dropped until
+/// [stream] has a listener.**
+///
+/// [stream] will be a single-subscription stream for groups constructed with
Lasse Reichstein Nielsen 2015/06/18 10:29:31 I find "groups" confusing - using "a group" or "a
nweiz 2015/06/19 00:44:16 Reworded.
+/// [new StreamGroup]. In this case, if [stream] is paused or canceled, all
+/// streams in the group will likewise be paused or canceled, respectively.
+///
+/// [stream] will be a broadcast stream for groups constructed with [new
+/// StreamGroup.broadcast]. In this case, the streams in the group will never be
+/// paused and single-subscription streams in the group will never be canceled.
+/// **Note that single-subscription streams in a broadcast group may drop events
+/// if a listener is added and later removed.** Broadcast streams in the group
+/// will be canceled once [stream] has no listeners, and will be listened to
+/// again once [stream] has listeners.
+///
+/// [stream] won't close until [close] is called on the group *and* every stream
+/// in the group closes.
+class StreamGroup<T> implements Sink<Stream<T>> {
+ /// The stream through which all events from streams in the group are emitted.
+ Stream<T> get stream => _controller.stream;
+ StreamController<T> _controller;
+
+ /// Whether the group is closed, meaning that no more streams may be added.
+ var _closed = false;
+
+ /// The current state of the group.
+ ///
+ /// See [_StreamGroupState] for detailed descriptions of each state.
+ var _state = _StreamGroupState.dormant;
+
+ /// The streams that have been added to the group, but not yet listened to.
+ ///
+ /// This will be empty unless the group is dormant (that is, [listen] has not
+ /// yet been called). Once the group is listening, streams and their
+ /// subscriptions will be directly added to [_subscriptions] instead.
+ final _streams = new List<Stream<T>>();
+
+ /// Subscriptions to the streams that make up the group.
+ ///
+ /// The keys are the streams that are listened on, and the values are the
+ /// subscriptions to those streams. This will be empty as long as the group is
+ /// dormant.
Lasse Reichstein Nielsen 2015/06/18 10:29:31 It seems that it will still contain single-sub gro
Lasse Reichstein Nielsen 2015/06/18 10:29:31 Perhaps state that the map is used to look up the
nweiz 2015/06/19 00:44:17 That's true, I'll state that explicitly.
+ final _subscriptions = new Map<Stream<T>, StreamSubscription<T>>();
+
+ /// Merges the events from [streams] into a single (single-subscriber) stream.
+ ///
+ /// This is equivalent to adding [streams] to a group, closing that group, and
+ /// returning its stream.
+ static Stream merge(Iterable<Stream> streams) {
+ var group = new StreamGroup();
+ streams.forEach(group.add);
+ group.close();
+ return group.stream;
+ }
+
+ /// Creates a new stream group where [stream] is single-subscriber.
+ StreamGroup() {
+ _controller = new StreamController<T>(
+ onListen: _onListen,
+ onPause: _onPause,
+ onResume: _onResume,
+ onCancel: _onCancel,
+ sync: true);
+ }
+
+ /// Creates a new stream group where [stream] is a broadcast stream.
+ StreamGroup.broadcast() {
+ _controller = new StreamController<T>.broadcast(
+ onListen: _onListen,
+ onCancel: _onCancelBroadcast,
+ sync: true);
+ }
+
+ /// Adds [stream] as a member of this group.
+ ///
+ /// Any events from [stream] will be emitted through [this.stream]. If this
+ /// group has a listener, [stream] will be listened to immediately; otherwise
+ /// it will only be listened to once this group gets a listener.
+ ///
+ /// If this is a single-subscription group and its subscription has been
+ /// canceled, [stream] will be canceled as soon as its added. If this returns
+ /// a Future, it will be returned from [add]. Otherwise, [add] returns `null`.
Lasse Reichstein Nielsen 2015/06/18 10:29:32 [Future]
nweiz 2015/06/19 00:44:16 Done.
+ ///
+ /// Throws a [StateError] if this group is closed.
+ Future add(Stream<T> stream) {
+ if (_closed) {
+ throw new StateError("Can't add a Stream to a closed StreamGroup.");
+ }
+
+ if (_state == _StreamGroupState.dormant) {
+ _streams.add(stream);
Lasse Reichstein Nielsen 2015/06/18 10:29:31 Here you can add the same stream more than once wh
nweiz 2015/06/19 00:44:17 Good idea! This makes a lot of stuff cleaner.
+ } else if (_state == _StreamGroupState.canceled) {
+ // Listen to the stream and cancel it immediately so that no one else can
+ // listen, for consistency. If the stream has an onCancel listener this
+ // will also fire that, which may help it clean up resources.
+ return stream.listen(null).cancel();
Lasse Reichstein Nielsen 2015/06/18 10:29:31 The listen call can fail (for a single-sub stream
nweiz 2015/06/19 00:44:16 It would fail if the group weren't canceled, too,
+ } else {
+ _listenToStream(stream);
+ }
+
+ return null;
+ }
+
+ /// Removes [stream] as a member of this group.
+ ///
+ /// No further events from [stream] will be emitted through this group. If
+ /// [stream] has been listened to, its subscription will be canceled.
+ ///
+ /// If [stream] has been listened to, this *synchronously* cancels its
+ /// subscription. This means that any events from [stream] that haven't yet
+ /// been emitted through this group will not be, even if they were added to
Lasse Reichstein Nielsen 2015/06/18 10:29:31 Maybe drop the ", even if ..." part, I think it on
nweiz 2015/06/19 00:44:17 Done.
+ /// [stream] before it was removed.
+ ///
+ /// If [stream]'s subscription was canceled, this returns
Lasse Reichstein Nielsen 2015/06/18 10:29:31 was => is
nweiz 2015/06/19 00:44:17 Done.
+ /// [StreamSubscription.cancel]'s return value. Otherwise, it returns `null`.
+ Future remove(Stream<T> stream) {
+ if (_state == _StreamGroupState.dormant) {
+ _streams.remove(stream);
Lasse Reichstein Nielsen 2015/06/18 10:29:31 If this is a broadcast group and a single-sub stre
nweiz 2015/06/19 00:44:17 Merging _subscriptions and _streams made this prob
+ if (_closed && _streams.isEmpty) _controller.close();
+ return null;
+ }
+
+ var subscription = _subscriptions.remove(stream);
+ var future = subscription == null ? null : subscription.cancel();
+ if (_closed && _subscriptions.isEmpty) _controller.close();
+ return future;
+ }
+
+ /// A callback called when [stream] is listened to.
+ ///
+ /// This is called for both single-subscription and broadcast groups.
+ void _onListen() {
+ _state = _StreamGroupState.listening;
+
+ // Now that the group is marked as listening, [add] will actually subscribe
+ // rather than adding a stream to [_streams].
+ _streams.forEach(_listenToStream);
+ _streams.clear();
+ }
+
+ /// A callback called when [stream] is paused.
+ void _onPause() {
+ _state = _StreamGroupState.paused;
+ for (var subscription in _subscriptions.values) {
+ subscription.pause();
+ }
+ }
+
+ /// A callback called when [stream] is resumed.
+ void _onResume() {
+ _state = _StreamGroupState.listening;
+ for (var subscription in _subscriptions.values) {
+ subscription.resume();
+ }
+ }
+
+ /// A callback called when [stream] is canceled.
+ ///
+ /// This is only called for single-subscription groups.
+ Future _onCancel() {
+ _state = _StreamGroupState.canceled;
+
+ var futures = _subscriptions.values
+ .map((subscription) => subscription.cancel())
+ .where((future) => future != null)
+ .toList();
+
+ _subscriptions.clear();
+ return futures.isEmpty ? null : Future.wait(futures);
+ }
+
+ /// A callback called when [stream] is canceled.
Lasse Reichstein Nielsen 2015/06/18 10:29:31 is canceled -> 's last listener is canceled ? Bro
nweiz 2015/06/19 00:44:17 Done.
+ ///
+ /// This is only called for broadcast groups.
+ void _onCancelBroadcast() {
+ _state = _StreamGroupState.dormant;
+
+ for (var stream in _subscriptions.keys.toList()) {
+ if (stream.isBroadcast) _subscriptions.remove(stream).cancel();
+ _streams.add(stream);
Lasse Reichstein Nielsen 2015/06/18 10:29:31 Neat, cancel the broadcast streams, but let the si
nweiz 2015/06/19 00:44:17 Done.
+ }
+ }
+
+ /// Starts actively forwarding events from [stream] to [_controller].
+ ///
+ /// This will add the resulting subscription to [_subscriptions] and pause it
+ /// if [this] is paused.
+ void _listenToStream(Stream stream) {
+ _subscriptions.putIfAbsent(stream, () {
+ var subscription = stream.listen(
+ _controller.add,
+ onError: _controller.addError,
+ onDone: () => remove(stream));
+ if (_state == _StreamGroupState.paused) subscription.pause();
+ return subscription;
+ });
+ }
+
+ /// Closes the group, indicating that no more streams will be added.
+ ///
+ /// If there are no streams in the group, [stream] is closed immediately.
+ /// Otherwise, [stream] will close once all streams in the group close.
+ ///
+ /// Returns a [Future] that completes once [stream] has actually been closed.
+ Future close() {
+ if (_closed) return _controller.done;
+
+ _closed = true;
+ if (_state == _StreamGroupState.dormant) {
+ if (_streams.isEmpty) _controller.close();
Lasse Reichstein Nielsen 2015/06/18 10:29:31 If you are dormant in a broadcast stream with a si
nweiz 2015/06/19 00:44:16 Fixed by merging _subscriptions and _streams.
+ } else if (_subscriptions.isEmpty) {
+ _controller.close();
+ }
+
+ return _controller.done;
+ }
+}
+
+/// An enum of possible states of a [StreamGroup].
Lasse Reichstein Nielsen 2015/06/18 10:29:31 Would a real enum work? (Not that I particularly l
nweiz 2015/06/19 00:44:16 Yes, technically, but I tend to avoid those on pri
+class _StreamGroupState {
+ /// The group has no listeners.
+ ///
+ /// New streams added to the group will be listened once the group has a
+ /// listener.
+ static const dormant = const _StreamGroupState("dormant");
+
+ /// The group has one or more listeners and is actively firing events.
+ ///
+ /// New streams added to the group will be immediately listeners.
+ static const listening = const _StreamGroupState("listening");
+
+ /// The group is paused and no more events will be fired until it resumes.
+ ///
+ /// New streams added to the group will be listened to, but then paused. They
+ /// will be resumed once the group itself is resumed.
+ ///
+ /// This state is only used by single-subscriber groups.
+ static const paused = const _StreamGroupState("paused");
+
+ /// The group is canceled and no more events will be fired ever.
+ ///
+ /// New streams added to the group will be listened to, canceled, and
+ /// discarded.
+ ///
+ /// This state is only used by single-subscriber groups.
+ static const canceled = const _StreamGroupState("canceled");
+
+ /// The name of the state.
+ ///
+ /// Used for debugging.
+ final String name;
+
+ const _StreamGroupState(this.name);
+
+ String toString() => name;
+}
« no previous file with comments | « lib/async.dart ('k') | test/stream_group_test.dart » ('j') | test/stream_group_test.dart » ('J')

Powered by Google App Engine
This is Rietveld 408576698