Chromium Code Reviews| OLD | NEW |
|---|---|
| (Empty) | |
| 1 // Copyright (c) 2015, the Dart project authors. Please see the AUTHORS file | |
| 2 // for details. All rights reserved. Use of this source code is governed by a | |
| 3 // BSD-style license that can be found in the LICENSE file. | |
| 4 | |
| 5 library async.forkable_stream; | |
| 6 | |
| 7 import 'dart:async'; | |
| 8 | |
| 9 import 'stream_completer.dart'; | |
| 10 | |
| 11 /// A single-subscription stream from which other streams may be forked off at | |
| 12 /// the current position. | |
| 13 /// | |
| 14 /// This adds an operation, [fork], which produces a new stream that | |
| 15 /// independently emits the same events as this stream. Unlike the branches | |
| 16 /// produced by [StreamSplitter], a fork only emits events that arrive *after* | |
| 17 /// the call to [fork]. | |
| 18 /// | |
| 19 /// Each fork can be paused or canceled independently of one another and of this | |
| 20 /// stream. The underlying stream will be listened to once any branch is | |
| 21 /// listened to. It will be paused when all branches are paused or not yet | |
| 22 /// listened to. It will be canceled when all branches have been listened to and | |
| 23 /// then canceled. | |
|
Lasse Reichstein Nielsen
2015/07/15 20:07:11
It seems it will be cancelled immediately when the
nweiz
2015/07/15 22:19:43
Only if [_controllers] is empty—that is, there are
| |
| 24 class ForkableStream<T> extends StreamView<T> { | |
|
Lasse Reichstein Nielsen
2015/07/15 20:07:11
AFAICS this differs from just making it a broadcas
nweiz
2015/07/15 22:19:43
I think the idea of a forkable stream is clearer t
| |
| 25 /// The underlying stream. | |
| 26 final Stream _sourceStream; | |
| 27 | |
| 28 /// The subscription to [_sourceStream]. | |
| 29 /// | |
| 30 /// This will be `null` until this stream or any of its forks are listened to. | |
| 31 StreamSubscription _subscription; | |
| 32 | |
| 33 /// Whether this has been cancelled and no more forks may be created. | |
| 34 bool _isClosed = false; | |
|
Lasse Reichstein Nielsen
2015/07/15 20:07:12
We usually use "isClosed" for when "close" has bee
nweiz
2015/07/15 22:19:43
Done. I was trying to use it in the same sense as
| |
| 35 | |
| 36 /// The controllers for any branches that have not yet been canceled. | |
| 37 /// | |
| 38 /// This includes a controller for this stream, until that has been cancelled. | |
| 39 final _controllers = new Set<StreamController<T>>(); | |
| 40 | |
| 41 /// Creates a new forkable stream wrapping [sourceStream]. | |
| 42 ForkableStream(Stream sourceStream) | |
| 43 // Use a completer here so that we can provide its stream to the | |
| 44 // superclass constructor while also adding the stream controller to | |
| 45 // [_controllers]. | |
| 46 : this._(sourceStream, new StreamCompleter()); | |
| 47 | |
| 48 ForkableStream._(this._sourceStream, StreamCompleter completer) | |
| 49 : super(completer.stream) { | |
| 50 completer.setSourceStream(_fork(primary: true)); | |
| 51 } | |
| 52 | |
| 53 /// Creates a new fork of this stream. | |
| 54 /// | |
| 55 /// From this point forward, the fork will emit the same events as this | |
| 56 /// stream. It will *not* emit any events that have already been emitted by | |
| 57 /// this stream. The fork is independent of this stream, which means each one | |
| 58 /// may be paused or canceled without affecting the other. | |
| 59 /// | |
| 60 /// Throws a [StateError] if this stream is done or its subscription has been | |
| 61 /// canceled. | |
| 62 Stream<T> fork() => _fork(primary: false); | |
|
Lasse Reichstein Nielsen
2015/07/16 14:01:23
I don't think fork belongs on Stream.
A stream is
nweiz
2015/07/17 20:30:16
I disagree. It's important that the fork can be cr
| |
| 63 | |
| 64 /// Creates a stream forwarding [_sourceStream]. | |
| 65 /// | |
| 66 /// If [primary] is true, this is the stream underlying this object; | |
| 67 /// otherwise, it's a fork. The only difference is that when the primary | |
| 68 /// stream is canceled, [fork] starts throwing [StateError]s. | |
| 69 Stream<T> _fork({bool primary: false}) { | |
| 70 if (_isClosed) { | |
| 71 throw new StateError("Can't fork a closed or canceled stream."); | |
|
Lasse Reichstein Nielsen
2015/07/15 20:07:11
Alternatively just return an empty stream (new Str
nweiz
2015/07/15 22:19:43
Done.
| |
| 72 } | |
| 73 | |
| 74 var controller; | |
| 75 controller = new StreamController<T>( | |
| 76 onListen: () => _onListenOrResume(controller), | |
| 77 onCancel: () => _onCancel(controller, primary: primary), | |
| 78 onPause: () => _onPause(controller), | |
| 79 onResume: () => _onListenOrResume(controller), | |
| 80 sync: true); | |
| 81 | |
| 82 _controllers.add(controller); | |
| 83 | |
| 84 return controller.stream; | |
| 85 } | |
| 86 | |
| 87 /// The callback called when `onListen` or `onResume` is called for the branch | |
| 88 /// managed by [controller]. | |
| 89 /// | |
| 90 /// This ensures that we're subscribed to [_sourceStream] and that the | |
| 91 /// subscription isn't paused. | |
| 92 void _onListenOrResume(StreamController<T> controller) { | |
| 93 if (controller.isClosed) return; | |
| 94 if (_subscription == null) { | |
| 95 _subscription = | |
| 96 _sourceStream.listen(_onData, onError: _onError, onDone: _onDone); | |
| 97 } else { | |
| 98 _subscription.resume(); | |
| 99 } | |
| 100 } | |
| 101 | |
| 102 /// The callback called when `onCancel` is called for the branch managed by | |
| 103 /// [controller]. | |
| 104 /// | |
| 105 /// This cancels or pauses the underlying subscription as necessary. If | |
| 106 /// [primary] is true, it also ensures that future calls to [fork] throw | |
| 107 /// [StateError]s. | |
| 108 Future _onCancel(StreamController<T> controller, {bool primary: false}) { | |
| 109 if (primary) _isClosed = true; | |
|
Lasse Reichstein Nielsen
2015/07/15 20:07:11
So if the primary stream is canceled, you can't fo
nweiz
2015/07/15 22:19:43
That's right, although you can still fork if you w
| |
| 110 | |
| 111 if (controller.isClosed) return null; | |
|
Lasse Reichstein Nielsen
2015/07/15 20:07:11
A controller is only closed by the "onDone" handle
nweiz
2015/07/15 22:19:43
Not quite—while dispatching _onDone, it's possible
| |
| 112 _controllers.remove(controller); | |
| 113 | |
| 114 if (_controllers.isEmpty) return _subscription.cancel(); | |
| 115 | |
| 116 _onPause(controller); | |
| 117 return null; | |
| 118 } | |
| 119 | |
| 120 /// The callback called when `onPause` is called for the branch managed by | |
| 121 /// [controller]. | |
| 122 /// | |
| 123 /// This pauses the underlying subscription if necessary. | |
| 124 void _onPause(StreamController<T> controller) { | |
| 125 if (controller.isClosed) return; | |
| 126 if (_subscription.isPaused) return; | |
| 127 if (_controllers.any((controller) => | |
| 128 controller.hasListener && !controller.isPaused)) { | |
| 129 return; | |
| 130 } | |
| 131 | |
| 132 _subscription.pause(); | |
| 133 } | |
| 134 | |
| 135 /// Forwards data events to all branches. | |
| 136 void _onData(value) { | |
| 137 // Don't iterate directly over the set because [controller.add] might cause | |
| 138 // it to be modified synchronously. | |
| 139 for (var controller in _controllers.toSet()) { | |
|
Lasse Reichstein Nielsen
2015/07/15 20:07:12
I would use `toList()` here. It should have a lowe
nweiz
2015/07/15 22:19:43
Done.
| |
| 140 controller.add(value); | |
| 141 } | |
| 142 } | |
| 143 | |
| 144 /// Forwards error events to all branches. | |
| 145 void _onError(error, StackTrace stackTrace) { | |
| 146 // Don't iterate directly over the set because [controller.addError] might | |
| 147 // cause it to be modified synchronously. | |
| 148 for (var controller in _controllers.toSet()) { | |
| 149 controller.addError(error, stackTrace); | |
| 150 } | |
| 151 } | |
| 152 | |
| 153 /// Forwards close events to all branches. | |
| 154 void _onDone() { | |
| 155 // Don't iterate directly over the set because [controller.close] might | |
| 156 // cause it to be modified synchronously. | |
| 157 for (var controller in _controllers.toSet()) { | |
| 158 controller.close(); | |
|
Lasse Reichstein Nielsen
2015/07/15 20:07:11
You may have a race condition here.
The _isClosed
nweiz
2015/07/15 22:19:43
Because the controller is first and the controller
| |
| 159 } | |
| 160 _controllers.clear(); | |
| 161 } | |
| 162 } | |
| 163 | |
| OLD | NEW |