| OLD | NEW |
| 1 // Copyright (c) 2015, the Dart project authors. Please see the AUTHORS file | 1 // Copyright (c) 2015, the Dart project authors. Please see the AUTHORS file |
| 2 // for details. All rights reserved. Use of this source code is governed by a | 2 // for details. All rights reserved. Use of this source code is governed by a |
| 3 // BSD-style license that can be found in the LICENSE file. | 3 // BSD-style license that can be found in the LICENSE file. |
| 4 | 4 |
| 5 library async.stream_completer; | |
| 6 | |
| 7 import "dart:async"; | 5 import "dart:async"; |
| 8 | 6 |
| 9 /// A single-subscription [stream] where the contents are provided later. | 7 /// A single-subscription [stream] where the contents are provided later. |
| 10 /// | 8 /// |
| 11 /// It is generally recommended that you never create a `Future<Stream>` | 9 /// It is generally recommended that you never create a `Future<Stream>` |
| 12 /// because you can just directly create a stream that doesn't do anything | 10 /// because you can just directly create a stream that doesn't do anything |
| 13 /// until it's ready to do so. | 11 /// until it's ready to do so. |
| 14 /// This class can be used to create such a stream. | 12 /// This class can be used to create such a stream. |
| 15 /// | 13 /// |
| 16 /// The [stream] is a normal stream that you can listen to immediately, | 14 /// The [stream] is a normal stream that you can listen to immediately, |
| 17 /// but until either [setSourceStream] or [setEmpty] is called, | 15 /// but until either [setSourceStream] or [setEmpty] is called, |
| 18 /// the stream won't produce any events. | 16 /// the stream won't produce any events. |
| 19 /// | 17 /// |
| 20 /// The same effect can be achieved by using a [StreamController] | 18 /// The same effect can be achieved by using a [StreamController] |
| 21 /// and adding the stream using `addStream` when both | 19 /// and adding the stream using `addStream` when both |
| 22 /// the controller's stream is listened to and the source stream is ready. | 20 /// the controller's stream is listened to and the source stream is ready. |
| 23 /// This class attempts to shortcut some of the overhead when possible. | 21 /// This class attempts to shortcut some of the overhead when possible. |
| 24 /// For example, if the [stream] is only listened to | 22 /// For example, if the [stream] is only listened to |
| 25 /// after the source stream has been set, | 23 /// after the source stream has been set, |
| 26 /// the listen is performed directly on the source stream. | 24 /// the listen is performed directly on the source stream. |
| 27 class StreamCompleter<T> { | 25 class StreamCompleter<T> { |
| 28 /// The stream doing the actual work, is returned by [stream]. | 26 /// The stream doing the actual work, is returned by [stream]. |
| 29 final _CompleterStream _stream = new _CompleterStream<T>(); | 27 final _stream = new _CompleterStream<T>(); |
| 30 | 28 |
| 31 /// Convert a `Future<Stream>` to a `Stream`. | 29 /// Convert a `Future<Stream>` to a `Stream`. |
| 32 /// | 30 /// |
| 33 /// This creates a stream using a stream completer, | 31 /// This creates a stream using a stream completer, |
| 34 /// and sets the source stream to the result of the future when the | 32 /// and sets the source stream to the result of the future when the |
| 35 /// future completes. | 33 /// future completes. |
| 36 /// | 34 /// |
| 37 /// If the future completes with an error, the returned stream will | 35 /// If the future completes with an error, the returned stream will |
| 38 /// instead contain just that error. | 36 /// instead contain just that error. |
| 39 static Stream fromFuture(Future<Stream> streamFuture) { | 37 static Stream<T> fromFuture<T>(Future<Stream<T>> streamFuture) { |
| 40 var completer = new StreamCompleter(); | 38 var completer = new StreamCompleter<T>(); |
| 41 streamFuture.then(completer.setSourceStream, | 39 streamFuture.then(completer.setSourceStream, onError: completer.setError); |
| 42 onError: (e, s) { | |
| 43 completer.setSourceStream(streamFuture.asStream()); | |
| 44 }); | |
| 45 return completer.stream; | 40 return completer.stream; |
| 46 } | 41 } |
| 47 | 42 |
| 48 /// The stream of this completer. | 43 /// The stream of this completer. |
| 49 /// | 44 /// |
| 50 /// This stream is always a single-subscription stream. | 45 /// This stream is always a single-subscription stream. |
| 51 /// | 46 /// |
| 52 /// When a source stream is provided, its events will be forwarded to | 47 /// When a source stream is provided, its events will be forwarded to |
| 53 /// listeners on this stream. | 48 /// listeners on this stream. |
| 54 /// | 49 /// |
| (...skipping 14 matching lines...) Expand all Loading... |
| 69 /// normal subscription, and can be paused or canceled, but it won't | 64 /// normal subscription, and can be paused or canceled, but it won't |
| 70 /// produce any events until a source stream is provided. | 65 /// produce any events until a source stream is provided. |
| 71 /// | 66 /// |
| 72 /// If the `stream` subscription is canceled before a source stream is set, | 67 /// If the `stream` subscription is canceled before a source stream is set, |
| 73 /// the source stream will be listened to and immediately canceled again. | 68 /// the source stream will be listened to and immediately canceled again. |
| 74 /// | 69 /// |
| 75 /// Otherwise, when the source stream is then set, | 70 /// Otherwise, when the source stream is then set, |
| 76 /// it is immediately listened to, and its events are forwarded to the | 71 /// it is immediately listened to, and its events are forwarded to the |
| 77 /// existing subscription. | 72 /// existing subscription. |
| 78 /// | 73 /// |
| 79 /// Either [setSourceStream] or [setEmpty] may be called at most once. | 74 /// Any one of [setSourceStream], [setEmpty], and [setError] may be called at |
| 80 /// Trying to call either of them again will fail. | 75 /// most once. Trying to call any of them again will fail. |
| 81 void setSourceStream(Stream<T> sourceStream) { | 76 void setSourceStream(Stream<T> sourceStream) { |
| 82 if (_stream._isSourceStreamSet) { | 77 if (_stream._isSourceStreamSet) { |
| 83 throw new StateError("Source stream already set"); | 78 throw new StateError("Source stream already set"); |
| 84 } | 79 } |
| 85 _stream._setSourceStream(sourceStream); | 80 _stream._setSourceStream(sourceStream); |
| 86 } | 81 } |
| 87 | 82 |
| 88 /// Equivalent to setting an empty stream using [setSourceStream]. | 83 /// Equivalent to setting an empty stream using [setSourceStream]. |
| 89 /// | 84 /// |
| 90 /// Either [setSourceStream] or [setEmpty] may be called at most once. | 85 /// Any one of [setSourceStream], [setEmpty], and [setError] may be called at |
| 91 /// Trying to call either of them again will fail. | 86 /// most once. Trying to call any of them again will fail. |
| 92 void setEmpty() { | 87 void setEmpty() { |
| 93 if (_stream._isSourceStreamSet) { | 88 if (_stream._isSourceStreamSet) { |
| 94 throw new StateError("Source stream already set"); | 89 throw new StateError("Source stream already set"); |
| 95 } | 90 } |
| 96 _stream._setEmpty(); | 91 _stream._setEmpty(); |
| 97 } | 92 } |
| 93 |
| 94 /// Completes this to a stream that emits [error] and then closes. |
| 95 /// |
| 96 /// This is useful when the process of creating the data for the stream fails. |
| 97 /// |
| 98 /// Any one of [setSourceStream], [setEmpty], and [setError] may be called at |
| 99 /// most once. Trying to call any of them again will fail. |
| 100 void setError(error, [StackTrace stackTrace]) { |
| 101 setSourceStream(new Stream.fromFuture(new Future.error(error, stackTrace))); |
| 102 } |
| 98 } | 103 } |
| 99 | 104 |
| 100 /// Stream completed by [StreamCompleter]. | 105 /// Stream completed by [StreamCompleter]. |
| 101 class _CompleterStream<T> extends Stream<T> { | 106 class _CompleterStream<T> extends Stream<T> { |
| 102 /// Controller for an intermediate stream. | 107 /// Controller for an intermediate stream. |
| 103 /// | 108 /// |
| 104 /// Created if the user listens on this stream before the source stream | 109 /// Created if the user listens on this stream before the source stream |
| 105 /// is set, or if using [_setEmpty] so there is no source stream. | 110 /// is set, or if using [_setEmpty] so there is no source stream. |
| 106 StreamController _controller; | 111 StreamController<T> _controller; |
| 107 | 112 |
| 108 /// Source stream for the events provided by this stream. | 113 /// Source stream for the events provided by this stream. |
| 109 /// | 114 /// |
| 110 /// Set when the completer sets the source stream using [_setSourceStream] | 115 /// Set when the completer sets the source stream using [_setSourceStream] |
| 111 /// or [_setEmpty]. | 116 /// or [_setEmpty]. |
| 112 Stream _sourceStream; | 117 Stream<T> _sourceStream; |
| 113 | 118 |
| 114 StreamSubscription<T> listen(onData(T data), | 119 StreamSubscription<T> listen(onData(T data), |
| 115 {Function onError, | 120 {Function onError, void onDone(), bool cancelOnError}) { |
| 116 void onDone(), | |
| 117 bool cancelOnError}) { | |
| 118 if (_controller == null) { | 121 if (_controller == null) { |
| 119 if (_sourceStream != null && !_sourceStream.isBroadcast) { | 122 if (_sourceStream != null && !_sourceStream.isBroadcast) { |
| 120 // If the source stream is itself single subscription, | 123 // If the source stream is itself single subscription, |
| 121 // just listen to it directly instead of creating a controller. | 124 // just listen to it directly instead of creating a controller. |
| 122 return _sourceStream.listen(onData, onError: onError, onDone: onDone, | 125 return _sourceStream.listen(onData, |
| 123 cancelOnError: cancelOnError); | 126 onError: onError, onDone: onDone, cancelOnError: cancelOnError); |
| 124 } | 127 } |
| 125 _createController(); | 128 _createController(); |
| 126 if (_sourceStream != null) { | 129 if (_sourceStream != null) { |
| 127 _linkStreamToController(); | 130 _linkStreamToController(); |
| 128 } | 131 } |
| 129 } | 132 } |
| 130 return _controller.stream.listen(onData, onError: onError, onDone: onDone, | 133 return _controller.stream.listen(onData, |
| 131 cancelOnError: cancelOnError); | 134 onError: onError, onDone: onDone, cancelOnError: cancelOnError); |
| 132 } | 135 } |
| 133 | 136 |
| 134 /// Whether a source stream has been set. | 137 /// Whether a source stream has been set. |
| 135 /// | 138 /// |
| 136 /// Used to throw an error if trying to set a source stream twice. | 139 /// Used to throw an error if trying to set a source stream twice. |
| 137 bool get _isSourceStreamSet => _sourceStream != null; | 140 bool get _isSourceStreamSet => _sourceStream != null; |
| 138 | 141 |
| 139 /// Sets the source stream providing the events for this stream. | 142 /// Sets the source stream providing the events for this stream. |
| 140 /// | 143 /// |
| 141 /// If set before the user listens, listen calls will be directed directly | 144 /// If set before the user listens, listen calls will be directed directly |
| 142 /// to the source stream. If the user listenes earlier, and intermediate | 145 /// to the source stream. If the user listenes earlier, and intermediate |
| 143 /// stream is created using a stream controller, and the source stream is | 146 /// stream is created using a stream controller, and the source stream is |
| 144 /// linked into that stream later. | 147 /// linked into that stream later. |
| 145 void _setSourceStream(Stream<T> sourceStream) { | 148 void _setSourceStream(Stream<T> sourceStream) { |
| 146 assert(_sourceStream == null); | 149 assert(_sourceStream == null); |
| 147 _sourceStream = sourceStream; | 150 _sourceStream = sourceStream; |
| 148 if (_controller != null) { | 151 if (_controller != null) { |
| 149 // User has already listened, so provide the data through controller. | 152 // User has already listened, so provide the data through controller. |
| 150 _linkStreamToController(); | 153 _linkStreamToController(); |
| 151 } | 154 } |
| 152 } | 155 } |
| 153 | 156 |
| 154 /// Links source stream to controller when both are available. | 157 /// Links source stream to controller when both are available. |
| 155 void _linkStreamToController() { | 158 void _linkStreamToController() { |
| 156 assert(_controller != null); | 159 assert(_controller != null); |
| 157 assert(_sourceStream != null); | 160 assert(_sourceStream != null); |
| 158 _controller.addStream(_sourceStream, cancelOnError: false) | 161 _controller |
| 159 .whenComplete(_controller.close); | 162 .addStream(_sourceStream, cancelOnError: false) |
| 163 .whenComplete(_controller.close); |
| 160 } | 164 } |
| 161 | 165 |
| 162 /// Sets an empty source stream. | 166 /// Sets an empty source stream. |
| 163 /// | 167 /// |
| 164 /// Uses [_controller] for the stream, then closes the controller | 168 /// Uses [_controller] for the stream, then closes the controller |
| 165 /// immediately. | 169 /// immediately. |
| 166 void _setEmpty() { | 170 void _setEmpty() { |
| 167 assert(_sourceStream == null); | 171 assert(_sourceStream == null); |
| 168 if (_controller == null) { | 172 if (_controller == null) { |
| 169 _createController(); | 173 _createController(); |
| 170 } | 174 } |
| 171 _sourceStream = _controller.stream; // Mark stream as set. | 175 _sourceStream = _controller.stream; // Mark stream as set. |
| 172 _controller.close(); | 176 _controller.close(); |
| 173 } | 177 } |
| 174 | 178 |
| 175 // Creates the [_controller]. | 179 // Creates the [_controller]. |
| 176 void _createController() { | 180 void _createController() { |
| 177 assert(_controller == null); | 181 assert(_controller == null); |
| 178 _controller = new StreamController<T>(sync: true); | 182 _controller = new StreamController<T>(sync: true); |
| 179 } | 183 } |
| 180 } | 184 } |
| OLD | NEW |