Chromium Code Reviews| Index: lib/src/stream_group.dart |
| diff --git a/lib/src/stream_group.dart b/lib/src/stream_group.dart |
| new file mode 100644 |
| index 0000000000000000000000000000000000000000..d1b51c183c073c89ce632157b8431620625e8518 |
| --- /dev/null |
| +++ b/lib/src/stream_group.dart |
| @@ -0,0 +1,269 @@ |
| +// Copyright (c) 2015, the Dart project authors. Please see the AUTHORS file |
| +// for details. All rights reserved. Use of this source code is governed by a |
| +// BSD-style license that can be found in the LICENSE file. |
| + |
| +library async.stream_group; |
| + |
| +import 'dart:async'; |
| + |
| +/// A collection of streams whose events are unified and sent through a central |
| +/// stream. |
| +/// |
| +/// Both errors and data events are forwarded through [stream]. The streams in |
| +/// the group won't be listened to until [stream] has a listener. **Note that |
| +/// this means that events emitted by broadcast streams will be dropped until |
| +/// [stream] has a listener.** |
| +/// |
| +/// [stream] will be a single-subscription stream for groups constructed with |
|
Lasse Reichstein Nielsen
2015/06/18 10:29:31
I find "groups" confusing - using "a group" or "a
nweiz
2015/06/19 00:44:16
Reworded.
|
| +/// [new StreamGroup]. In this case, if [stream] is paused or canceled, all |
| +/// streams in the group will likewise be paused or canceled, respectively. |
| +/// |
| +/// [stream] will be a broadcast stream for groups constructed with [new |
| +/// StreamGroup.broadcast]. In this case, the streams in the group will never be |
| +/// paused and single-subscription streams in the group will never be canceled. |
| +/// **Note that single-subscription streams in a broadcast group may drop events |
| +/// if a listener is added and later removed.** Broadcast streams in the group |
| +/// will be canceled once [stream] has no listeners, and will be listened to |
| +/// again once [stream] has listeners. |
| +/// |
| +/// [stream] won't close until [close] is called on the group *and* every stream |
| +/// in the group closes. |
| +class StreamGroup<T> implements Sink<Stream<T>> { |
| + /// The stream through which all events from streams in the group are emitted. |
| + Stream<T> get stream => _controller.stream; |
| + StreamController<T> _controller; |
| + |
| + /// Whether the group is closed, meaning that no more streams may be added. |
| + var _closed = false; |
| + |
| + /// The current state of the group. |
| + /// |
| + /// See [_StreamGroupState] for detailed descriptions of each state. |
| + var _state = _StreamGroupState.dormant; |
| + |
| + /// The streams that have been added to the group, but not yet listened to. |
| + /// |
| + /// This will be empty unless the group is dormant (that is, [listen] has not |
| + /// yet been called). Once the group is listening, streams and their |
| + /// subscriptions will be directly added to [_subscriptions] instead. |
| + final _streams = new List<Stream<T>>(); |
| + |
| + /// Subscriptions to the streams that make up the group. |
| + /// |
| + /// The keys are the streams that are listened on, and the values are the |
| + /// subscriptions to those streams. This will be empty as long as the group is |
| + /// dormant. |
|
Lasse Reichstein Nielsen
2015/06/18 10:29:31
It seems that it will still contain single-sub gro
Lasse Reichstein Nielsen
2015/06/18 10:29:31
Perhaps state that the map is used to look up the
nweiz
2015/06/19 00:44:17
That's true, I'll state that explicitly.
|
| + final _subscriptions = new Map<Stream<T>, StreamSubscription<T>>(); |
| + |
| + /// Merges the events from [streams] into a single (single-subscriber) stream. |
| + /// |
| + /// This is equivalent to adding [streams] to a group, closing that group, and |
| + /// returning its stream. |
| + static Stream merge(Iterable<Stream> streams) { |
| + var group = new StreamGroup(); |
| + streams.forEach(group.add); |
| + group.close(); |
| + return group.stream; |
| + } |
| + |
| + /// Creates a new stream group where [stream] is single-subscriber. |
| + StreamGroup() { |
| + _controller = new StreamController<T>( |
| + onListen: _onListen, |
| + onPause: _onPause, |
| + onResume: _onResume, |
| + onCancel: _onCancel, |
| + sync: true); |
| + } |
| + |
| + /// Creates a new stream group where [stream] is a broadcast stream. |
| + StreamGroup.broadcast() { |
| + _controller = new StreamController<T>.broadcast( |
| + onListen: _onListen, |
| + onCancel: _onCancelBroadcast, |
| + sync: true); |
| + } |
| + |
| + /// Adds [stream] as a member of this group. |
| + /// |
| + /// Any events from [stream] will be emitted through [this.stream]. If this |
| + /// group has a listener, [stream] will be listened to immediately; otherwise |
| + /// it will only be listened to once this group gets a listener. |
| + /// |
| + /// If this is a single-subscription group and its subscription has been |
| + /// canceled, [stream] will be canceled as soon as its added. If this returns |
| + /// a Future, it will be returned from [add]. Otherwise, [add] returns `null`. |
|
Lasse Reichstein Nielsen
2015/06/18 10:29:32
[Future]
nweiz
2015/06/19 00:44:16
Done.
|
| + /// |
| + /// Throws a [StateError] if this group is closed. |
| + Future add(Stream<T> stream) { |
| + if (_closed) { |
| + throw new StateError("Can't add a Stream to a closed StreamGroup."); |
| + } |
| + |
| + if (_state == _StreamGroupState.dormant) { |
| + _streams.add(stream); |
|
Lasse Reichstein Nielsen
2015/06/18 10:29:31
Here you can add the same stream more than once wh
nweiz
2015/06/19 00:44:17
Good idea! This makes a lot of stuff cleaner.
|
| + } else if (_state == _StreamGroupState.canceled) { |
| + // Listen to the stream and cancel it immediately so that no one else can |
| + // listen, for consistency. If the stream has an onCancel listener this |
| + // will also fire that, which may help it clean up resources. |
| + return stream.listen(null).cancel(); |
|
Lasse Reichstein Nielsen
2015/06/18 10:29:31
The listen call can fail (for a single-sub stream
nweiz
2015/06/19 00:44:16
It would fail if the group weren't canceled, too,
|
| + } else { |
| + _listenToStream(stream); |
| + } |
| + |
| + return null; |
| + } |
| + |
| + /// Removes [stream] as a member of this group. |
| + /// |
| + /// No further events from [stream] will be emitted through this group. If |
| + /// [stream] has been listened to, its subscription will be canceled. |
| + /// |
| + /// If [stream] has been listened to, this *synchronously* cancels its |
| + /// subscription. This means that any events from [stream] that haven't yet |
| + /// been emitted through this group will not be, even if they were added to |
|
Lasse Reichstein Nielsen
2015/06/18 10:29:31
Maybe drop the ", even if ..." part, I think it on
nweiz
2015/06/19 00:44:17
Done.
|
| + /// [stream] before it was removed. |
| + /// |
| + /// If [stream]'s subscription was canceled, this returns |
|
Lasse Reichstein Nielsen
2015/06/18 10:29:31
was => is
nweiz
2015/06/19 00:44:17
Done.
|
| + /// [StreamSubscription.cancel]'s return value. Otherwise, it returns `null`. |
| + Future remove(Stream<T> stream) { |
| + if (_state == _StreamGroupState.dormant) { |
| + _streams.remove(stream); |
|
Lasse Reichstein Nielsen
2015/06/18 10:29:31
If this is a broadcast group and a single-sub stre
nweiz
2015/06/19 00:44:17
Merging _subscriptions and _streams made this prob
|
| + if (_closed && _streams.isEmpty) _controller.close(); |
| + return null; |
| + } |
| + |
| + var subscription = _subscriptions.remove(stream); |
| + var future = subscription == null ? null : subscription.cancel(); |
| + if (_closed && _subscriptions.isEmpty) _controller.close(); |
| + return future; |
| + } |
| + |
| + /// A callback called when [stream] is listened to. |
| + /// |
| + /// This is called for both single-subscription and broadcast groups. |
| + void _onListen() { |
| + _state = _StreamGroupState.listening; |
| + |
| + // Now that the group is marked as listening, [add] will actually subscribe |
| + // rather than adding a stream to [_streams]. |
| + _streams.forEach(_listenToStream); |
| + _streams.clear(); |
| + } |
| + |
| + /// A callback called when [stream] is paused. |
| + void _onPause() { |
| + _state = _StreamGroupState.paused; |
| + for (var subscription in _subscriptions.values) { |
| + subscription.pause(); |
| + } |
| + } |
| + |
| + /// A callback called when [stream] is resumed. |
| + void _onResume() { |
| + _state = _StreamGroupState.listening; |
| + for (var subscription in _subscriptions.values) { |
| + subscription.resume(); |
| + } |
| + } |
| + |
| + /// A callback called when [stream] is canceled. |
| + /// |
| + /// This is only called for single-subscription groups. |
| + Future _onCancel() { |
| + _state = _StreamGroupState.canceled; |
| + |
| + var futures = _subscriptions.values |
| + .map((subscription) => subscription.cancel()) |
| + .where((future) => future != null) |
| + .toList(); |
| + |
| + _subscriptions.clear(); |
| + return futures.isEmpty ? null : Future.wait(futures); |
| + } |
| + |
| + /// A callback called when [stream] is canceled. |
|
Lasse Reichstein Nielsen
2015/06/18 10:29:31
is canceled -> 's last listener is canceled
?
Bro
nweiz
2015/06/19 00:44:17
Done.
|
| + /// |
| + /// This is only called for broadcast groups. |
| + void _onCancelBroadcast() { |
| + _state = _StreamGroupState.dormant; |
| + |
| + for (var stream in _subscriptions.keys.toList()) { |
| + if (stream.isBroadcast) _subscriptions.remove(stream).cancel(); |
| + _streams.add(stream); |
|
Lasse Reichstein Nielsen
2015/06/18 10:29:31
Neat, cancel the broadcast streams, but let the si
nweiz
2015/06/19 00:44:17
Done.
|
| + } |
| + } |
| + |
| + /// Starts actively forwarding events from [stream] to [_controller]. |
| + /// |
| + /// This will add the resulting subscription to [_subscriptions] and pause it |
| + /// if [this] is paused. |
| + void _listenToStream(Stream stream) { |
| + _subscriptions.putIfAbsent(stream, () { |
| + var subscription = stream.listen( |
| + _controller.add, |
| + onError: _controller.addError, |
| + onDone: () => remove(stream)); |
| + if (_state == _StreamGroupState.paused) subscription.pause(); |
| + return subscription; |
| + }); |
| + } |
| + |
| + /// Closes the group, indicating that no more streams will be added. |
| + /// |
| + /// If there are no streams in the group, [stream] is closed immediately. |
| + /// Otherwise, [stream] will close once all streams in the group close. |
| + /// |
| + /// Returns a [Future] that completes once [stream] has actually been closed. |
| + Future close() { |
| + if (_closed) return _controller.done; |
| + |
| + _closed = true; |
| + if (_state == _StreamGroupState.dormant) { |
| + if (_streams.isEmpty) _controller.close(); |
|
Lasse Reichstein Nielsen
2015/06/18 10:29:31
If you are dormant in a broadcast stream with a si
nweiz
2015/06/19 00:44:16
Fixed by merging _subscriptions and _streams.
|
| + } else if (_subscriptions.isEmpty) { |
| + _controller.close(); |
| + } |
| + |
| + return _controller.done; |
| + } |
| +} |
| + |
| +/// An enum of possible states of a [StreamGroup]. |
|
Lasse Reichstein Nielsen
2015/06/18 10:29:31
Would a real enum work?
(Not that I particularly l
nweiz
2015/06/19 00:44:16
Yes, technically, but I tend to avoid those on pri
|
| +class _StreamGroupState { |
| + /// The group has no listeners. |
| + /// |
| + /// New streams added to the group will be listened once the group has a |
| + /// listener. |
| + static const dormant = const _StreamGroupState("dormant"); |
| + |
| + /// The group has one or more listeners and is actively firing events. |
| + /// |
| + /// New streams added to the group will be immediately listeners. |
| + static const listening = const _StreamGroupState("listening"); |
| + |
| + /// The group is paused and no more events will be fired until it resumes. |
| + /// |
| + /// New streams added to the group will be listened to, but then paused. They |
| + /// will be resumed once the group itself is resumed. |
| + /// |
| + /// This state is only used by single-subscriber groups. |
| + static const paused = const _StreamGroupState("paused"); |
| + |
| + /// The group is canceled and no more events will be fired ever. |
| + /// |
| + /// New streams added to the group will be listened to, canceled, and |
| + /// discarded. |
| + /// |
| + /// This state is only used by single-subscriber groups. |
| + static const canceled = const _StreamGroupState("canceled"); |
| + |
| + /// The name of the state. |
| + /// |
| + /// Used for debugging. |
| + final String name; |
| + |
| + const _StreamGroupState(this.name); |
| + |
| + String toString() => name; |
| +} |