Chromium Code Reviews| Index: lib/src/forkable_stream.dart |
| diff --git a/lib/src/forkable_stream.dart b/lib/src/forkable_stream.dart |
| new file mode 100644 |
| index 0000000000000000000000000000000000000000..4c4d68246a9fc423cc372c4a5d7c156b4134e487 |
| --- /dev/null |
| +++ b/lib/src/forkable_stream.dart |
| @@ -0,0 +1,163 @@ |
| +// Copyright (c) 2015, the Dart project authors. Please see the AUTHORS file |
| +// for details. All rights reserved. Use of this source code is governed by a |
| +// BSD-style license that can be found in the LICENSE file. |
| + |
| +library async.forkable_stream; |
| + |
| +import 'dart:async'; |
| + |
| +import 'stream_completer.dart'; |
| + |
| +/// A single-subscription stream from which other streams may be forked off at |
| +/// the current position. |
| +/// |
| +/// This adds an operation, [fork], which produces a new stream that |
| +/// independently emits the same events as this stream. Unlike the branches |
| +/// produced by [StreamSplitter], a fork only emits events that arrive *after* |
| +/// the call to [fork]. |
| +/// |
| +/// Each fork can be paused or canceled independently of one another and of this |
| +/// stream. The underlying stream will be listened to once any branch is |
| +/// listened to. It will be paused when all branches are paused or not yet |
| +/// listened to. It will be canceled when all branches have been listened to and |
| +/// then canceled. |
|
Lasse Reichstein Nielsen
2015/07/15 20:07:11
It seems it will be cancelled immediately when the
nweiz
2015/07/15 22:19:43
Only if [_controllers] is empty—that is, there are
|
| +class ForkableStream<T> extends StreamView<T> { |
|
Lasse Reichstein Nielsen
2015/07/15 20:07:11
AFAICS this differs from just making it a broadcas
nweiz
2015/07/15 22:19:43
I think the idea of a forkable stream is clearer t
|
| + /// The underlying stream. |
| + final Stream _sourceStream; |
| + |
| + /// The subscription to [_sourceStream]. |
| + /// |
| + /// This will be `null` until this stream or any of its forks are listened to. |
| + StreamSubscription _subscription; |
| + |
| + /// Whether this has been cancelled and no more forks may be created. |
| + bool _isClosed = false; |
|
Lasse Reichstein Nielsen
2015/07/15 20:07:12
We usually use "isClosed" for when "close" has bee
nweiz
2015/07/15 22:19:43
Done. I was trying to use it in the same sense as
|
| + |
| + /// The controllers for any branches that have not yet been canceled. |
| + /// |
| + /// This includes a controller for this stream, until that has been cancelled. |
| + final _controllers = new Set<StreamController<T>>(); |
| + |
| + /// Creates a new forkable stream wrapping [sourceStream]. |
| + ForkableStream(Stream sourceStream) |
| + // Use a completer here so that we can provide its stream to the |
| + // superclass constructor while also adding the stream controller to |
| + // [_controllers]. |
| + : this._(sourceStream, new StreamCompleter()); |
| + |
| + ForkableStream._(this._sourceStream, StreamCompleter completer) |
| + : super(completer.stream) { |
| + completer.setSourceStream(_fork(primary: true)); |
| + } |
| + |
| + /// Creates a new fork of this stream. |
| + /// |
| + /// From this point forward, the fork will emit the same events as this |
| + /// stream. It will *not* emit any events that have already been emitted by |
| + /// this stream. The fork is independent of this stream, which means each one |
| + /// may be paused or canceled without affecting the other. |
| + /// |
| + /// Throws a [StateError] if this stream is done or its subscription has been |
| + /// canceled. |
| + Stream<T> fork() => _fork(primary: false); |
|
Lasse Reichstein Nielsen
2015/07/16 14:01:23
I don't think fork belongs on Stream.
A stream is
nweiz
2015/07/17 20:30:16
I disagree. It's important that the fork can be cr
|
| + |
| + /// Creates a stream forwarding [_sourceStream]. |
| + /// |
| + /// If [primary] is true, this is the stream underlying this object; |
| + /// otherwise, it's a fork. The only difference is that when the primary |
| + /// stream is canceled, [fork] starts throwing [StateError]s. |
| + Stream<T> _fork({bool primary: false}) { |
| + if (_isClosed) { |
| + throw new StateError("Can't fork a closed or canceled stream."); |
|
Lasse Reichstein Nielsen
2015/07/15 20:07:11
Alternatively just return an empty stream (new Str
nweiz
2015/07/15 22:19:43
Done.
|
| + } |
| + |
| + var controller; |
| + controller = new StreamController<T>( |
| + onListen: () => _onListenOrResume(controller), |
| + onCancel: () => _onCancel(controller, primary: primary), |
| + onPause: () => _onPause(controller), |
| + onResume: () => _onListenOrResume(controller), |
| + sync: true); |
| + |
| + _controllers.add(controller); |
| + |
| + return controller.stream; |
| + } |
| + |
| + /// The callback called when `onListen` or `onResume` is called for the branch |
| + /// managed by [controller]. |
| + /// |
| + /// This ensures that we're subscribed to [_sourceStream] and that the |
| + /// subscription isn't paused. |
| + void _onListenOrResume(StreamController<T> controller) { |
| + if (controller.isClosed) return; |
| + if (_subscription == null) { |
| + _subscription = |
| + _sourceStream.listen(_onData, onError: _onError, onDone: _onDone); |
| + } else { |
| + _subscription.resume(); |
| + } |
| + } |
| + |
| + /// The callback called when `onCancel` is called for the branch managed by |
| + /// [controller]. |
| + /// |
| + /// This cancels or pauses the underlying subscription as necessary. If |
| + /// [primary] is true, it also ensures that future calls to [fork] throw |
| + /// [StateError]s. |
| + Future _onCancel(StreamController<T> controller, {bool primary: false}) { |
| + if (primary) _isClosed = true; |
|
Lasse Reichstein Nielsen
2015/07/15 20:07:11
So if the primary stream is canceled, you can't fo
nweiz
2015/07/15 22:19:43
That's right, although you can still fork if you w
|
| + |
| + if (controller.isClosed) return null; |
|
Lasse Reichstein Nielsen
2015/07/15 20:07:11
A controller is only closed by the "onDone" handle
nweiz
2015/07/15 22:19:43
Not quite—while dispatching _onDone, it's possible
|
| + _controllers.remove(controller); |
| + |
| + if (_controllers.isEmpty) return _subscription.cancel(); |
| + |
| + _onPause(controller); |
| + return null; |
| + } |
| + |
| + /// The callback called when `onPause` is called for the branch managed by |
| + /// [controller]. |
| + /// |
| + /// This pauses the underlying subscription if necessary. |
| + void _onPause(StreamController<T> controller) { |
| + if (controller.isClosed) return; |
| + if (_subscription.isPaused) return; |
| + if (_controllers.any((controller) => |
| + controller.hasListener && !controller.isPaused)) { |
| + return; |
| + } |
| + |
| + _subscription.pause(); |
| + } |
| + |
| + /// Forwards data events to all branches. |
| + void _onData(value) { |
| + // Don't iterate directly over the set because [controller.add] might cause |
| + // it to be modified synchronously. |
| + for (var controller in _controllers.toSet()) { |
|
Lasse Reichstein Nielsen
2015/07/15 20:07:12
I would use `toList()` here. It should have a lowe
nweiz
2015/07/15 22:19:43
Done.
|
| + controller.add(value); |
| + } |
| + } |
| + |
| + /// Forwards error events to all branches. |
| + void _onError(error, StackTrace stackTrace) { |
| + // Don't iterate directly over the set because [controller.addError] might |
| + // cause it to be modified synchronously. |
| + for (var controller in _controllers.toSet()) { |
| + controller.addError(error, stackTrace); |
| + } |
| + } |
| + |
| + /// Forwards close events to all branches. |
| + void _onDone() { |
| + // Don't iterate directly over the set because [controller.close] might |
| + // cause it to be modified synchronously. |
| + for (var controller in _controllers.toSet()) { |
| + controller.close(); |
|
Lasse Reichstein Nielsen
2015/07/15 20:07:11
You may have a race condition here.
The _isClosed
nweiz
2015/07/15 22:19:43
Because the controller is first and the controller
|
| + } |
| + _controllers.clear(); |
| + } |
| +} |
| + |