Chromium Code Reviews| OLD | NEW |
|---|---|
| (Empty) | |
| 1 // Copyright (c) 2015, the Dart project authors. Please see the AUTHORS file | |
| 2 // for details. All rights reserved. Use of this source code is governed by a | |
| 3 // BSD-style license that can be found in the LICENSE file. | |
| 4 | |
| 5 library async.stream_completer; | |
| 6 | |
| 7 import "dart:async"; | |
| 8 | |
| 9 /// A single-subscription [stream] where the contents are provided later. | |
| 10 /// | |
| 11 /// It is generally recommended that you never create a `Future<Stream>` | |
| 12 /// because you can just directly create a stream that doesn't do anything | |
| 13 /// until it's ready to do so. | |
| 14 /// This class can be used to create such a stream. | |
| 15 /// | |
| 16 /// The [stream] is a normal stream that you can listen to immediately, | |
| 17 /// but until either [setSourceStream] or [setEmpty] is called, | |
| 18 /// the stream won't produce any events. | |
| 19 /// | |
| 20 /// The same effect can be achieved by using a [StreamController] | |
| 21 /// and adding the stream using `addStream` when both | |
| 22 /// the controller's stream is listened to and the source stream is ready. | |
| 23 /// This class attempts to shortcut some of the overhead when possible. | |
| 24 /// For example, if the [stream] is only listened to | |
| 25 /// after the source stream has been set, | |
| 26 /// the listen is performed directly on the source stream. | |
| 27 class StreamCompleter<T> { | |
| 28 /// The stream doing the actual work, is returned by [stream]. | |
| 29 final _CompleterStream _stream = new _CompleterStream<T>(); | |
| 30 | |
| 31 /// Convert a `Future<Stream>` to a `Stream`. | |
| 32 /// | |
| 33 /// This creates a stream using a stream completer, | |
| 34 /// and sets the source stream to the result of the future when the | |
| 35 /// future completes. | |
| 36 /// | |
| 37 /// If the future completes with an error, the returned stream will | |
| 38 /// instead contain just that error. | |
| 39 static Stream fromFuture(Future<Stream> streamFuture) { | |
| 40 var completer = new StreamCompleter(); | |
| 41 streamFuture.then(completer.setSourceStream, | |
| 42 onError: (e, s) { | |
| 43 completer.setSourceStream(streamFuture.asStream()); | |
| 44 }); | |
| 45 return completer.stream; | |
| 46 } | |
| 47 | |
| 48 /// The stream of this completer. | |
| 49 /// | |
| 50 /// When a source stream is provided, its events will be forwarded to | |
| 51 /// listeners on this stream. | |
| 52 /// | |
| 53 /// The stream can be listened either before or after a source stream | |
| 54 /// is set. | |
|
nweiz
2015/06/30 23:39:48
Document that this is always single-subscription,
Lasse Reichstein Nielsen
2015/07/01 08:24:57
Done.
| |
| 55 Stream<T> get stream => _stream; | |
| 56 | |
| 57 /// Set a stream as the source of events for the [StreamCompleter]'s | |
| 58 /// [stream]. | |
| 59 /// | |
| 60 /// The completer's `stream` will act exactly as [sourceStream]. | |
| 61 /// | |
| 62 /// If the source stream is set before [stream] is listened to, | |
| 63 /// the listen call on [stream] is forwarded directly to [sourceStream]. | |
| 64 /// | |
| 65 /// If [stream] is listened to before setting the source stream, | |
| 66 /// an intermediate subscription is created. It looks like a completely | |
| 67 /// normal subscription, and can be paused or canceled, but it won't | |
| 68 /// produce any events until a source stream is provided. | |
| 69 /// | |
| 70 /// If the `stream` subscription is canceled before a source stream is set, | |
| 71 /// the source stream will be listened to and immediately canceled again. | |
| 72 /// | |
| 73 /// Otherwise, when the source stream is then set, | |
| 74 /// it is immediately listened to, and its events are forwarded to the | |
| 75 /// existing subscription. | |
| 76 /// | |
| 77 /// Either [setSourceStream] or [setEmpty] may be called at most once. | |
| 78 /// Trying to call either of them again will fail. | |
| 79 void setSourceStream(Stream<T> sourceStream) { | |
| 80 if (_stream._isSourceStreamSet) { | |
| 81 throw new StateError("Source stream already set"); | |
| 82 } | |
| 83 _stream._setSourceStream(sourceStream); | |
| 84 } | |
| 85 | |
| 86 /// Equivalent to setting an empty stream using [setSourceStream]. | |
| 87 /// | |
| 88 /// Either [setSourceStream] or [setEmpty] may be called at most once. | |
| 89 /// Trying to call either of them again will fail. | |
| 90 void setEmpty() { | |
| 91 if (_stream._isSourceStreamSet) { | |
| 92 throw new StateError("Source stream already set"); | |
| 93 } | |
| 94 _stream._setEmpty(); | |
| 95 } | |
| 96 } | |
| 97 | |
| 98 /// Stream completed by [StreamCompleter]. | |
| 99 class _CompleterStream<T> extends Stream<T> { | |
| 100 /// Controller for an intermediate stream. | |
| 101 /// | |
| 102 /// Created if the user listens on this stream before the source stream | |
| 103 /// is set, or if using [_setEmpty] so there is no source stream. | |
| 104 StreamController _controller; | |
| 105 | |
| 106 /// Source stream for the events provided by this stream. | |
| 107 /// | |
| 108 /// Set when the completer sets the source stream using [_setSourceStream] | |
| 109 /// or [_setEmpty]. | |
| 110 Stream _sourceStream; | |
| 111 | |
| 112 /// Completer for future returned by [setSourceStream]. | |
| 113 /// | |
| 114 /// The future is completed when the stream has been added or canceled, | |
| 115 /// and if canceling causes an error, the future is completed with that error. | |
| 116 final Completer _doneCompleter = new Completer(); | |
|
nweiz
2015/06/30 23:39:48
It looks like this is never completed.
Lasse Reichstein Nielsen
2015/07/01 08:24:57
True. It should be completed when the source strea
| |
| 117 | |
| 118 StreamSubscription<T> listen(onData(T data), | |
| 119 {Function onError, | |
| 120 void onDone(), | |
| 121 bool cancelOnError}) { | |
| 122 if (_controller == null) { | |
| 123 if (_sourceStream != null && !_sourceStream.isBroadcast) { | |
| 124 // If the source stream is itself single subscription, | |
| 125 // just listen to it directly instead of creating a controller. | |
| 126 return _sourceStream.listen(onData, onError: onError, onDone: onDone, | |
| 127 cancelOnError: cancelOnError); | |
| 128 } | |
| 129 _createController(); | |
| 130 if (_sourceStream != null) { | |
| 131 _linkStreamToController(); | |
| 132 } | |
| 133 } | |
| 134 return _controller.stream.listen(onData, onError: onError, onDone: onDone, | |
| 135 cancelOnError: cancelOnError); | |
| 136 } | |
| 137 | |
| 138 /// Whether a source stream has been set. | |
| 139 /// | |
| 140 /// Used to throw an error if trying to set a source stream twice. | |
| 141 bool get _isSourceStreamSet => _sourceStream != null; | |
| 142 | |
| 143 /// Sets the source stream providing the events for this stream. | |
| 144 /// | |
| 145 /// If set before the user listens, listen calls will be directed directly | |
| 146 /// to the source stream. If the user listenes earlier, and intermediate | |
| 147 /// stream is created using a stream controller, and the source stream is | |
| 148 /// linked into that stream later. | |
| 149 Future _setSourceStream(Stream<T> sourceStream) { | |
|
nweiz
2015/06/30 23:39:47
It looks like this return value is unused. Should
Lasse Reichstein Nielsen
2015/07/01 08:24:57
I removed the _doneCompleter entirely. If someone
| |
| 150 assert(_sourceStream == null); | |
| 151 _sourceStream = sourceStream; | |
| 152 if (_controller != null) { | |
| 153 // User has already listened, so provide the data through controller. | |
| 154 _linkStreamToController(); | |
| 155 } | |
| 156 return _doneCompleter.future; | |
| 157 } | |
| 158 | |
| 159 /// Links source stream to controller when both are available. | |
| 160 void _linkStreamToController() { | |
| 161 assert(_controller != null); | |
| 162 assert(_sourceStream != null); | |
| 163 _controller.addStream(_sourceStream, cancelOnError: false) | |
| 164 .whenComplete(_controller.close); | |
| 165 } | |
| 166 | |
| 167 /// Sets an empty source stream. | |
| 168 /// | |
| 169 /// Uses [_controller] for the stream, then closes the controller | |
| 170 /// immediately. | |
| 171 void _setEmpty() { | |
| 172 assert(_sourceStream == null); | |
| 173 if (_controller == null) { | |
| 174 _createController(); | |
| 175 } | |
| 176 _sourceStream = _controller.stream; // Mark stream as set. | |
| 177 _controller.close(); | |
| 178 } | |
| 179 | |
| 180 // Creates the [_controller]. | |
| 181 void _createController() { | |
| 182 assert(_controller == null); | |
| 183 _controller = new StreamController<T>(sync: true); | |
| 184 } | |
| 185 } | |
| OLD | NEW |