OLD | NEW |
(Empty) | |
| 1 // Copyright (c) 2015, the Dart project authors. Please see the AUTHORS file |
| 2 // for details. All rights reserved. Use of this source code is governed by a |
| 3 // BSD-style license that can be found in the LICENSE file. |
| 4 |
| 5 library async.stream_completer; |
| 6 |
| 7 import "dart:async"; |
| 8 |
| 9 /// A single-subscription [stream] where the contents are provided later. |
| 10 /// |
| 11 /// It is generally recommended that you never create a `Future<Stream>` |
| 12 /// because you can just directly create a stream that doesn't do anything |
| 13 /// until it's ready to do so. |
| 14 /// This class can be used to create such a stream. |
| 15 /// |
| 16 /// The [stream] is a normal stream that you can listen to immediately, |
| 17 /// but until either [setSourceStream] or [setEmpty] is called, |
| 18 /// the stream won't produce any events. |
| 19 /// |
| 20 /// The same effect can be achieved by using a [StreamController] |
| 21 /// and adding the stream using `addStream` when both |
| 22 /// the controller's stream is listened to and the source stream is ready. |
| 23 /// This class attempts to shortcut some of the overhead when possible. |
| 24 /// For example, if the [stream] is only listened to |
| 25 /// after the source stream has been set, |
| 26 /// the listen is performed directly on the source stream. |
| 27 class StreamCompleter<T> { |
| 28 /// The stream doing the actual work, is returned by [stream]. |
| 29 final _CompleterStream _stream = new _CompleterStream<T>(); |
| 30 |
| 31 /// Convert a `Future<Stream>` to a `Stream`. |
| 32 /// |
| 33 /// This creates a stream using a stream completer, |
| 34 /// and sets the source stream to the result of the future when the |
| 35 /// future completes. |
| 36 /// |
| 37 /// If the future completes with an error, the returned stream will |
| 38 /// instead contain just that error. |
| 39 static Stream fromFuture(Future<Stream> streamFuture) { |
| 40 var completer = new StreamCompleter(); |
| 41 streamFuture.then(completer.setSourceStream, |
| 42 onError: (e, s) { |
| 43 completer.setSourceStream(streamFuture.asStream()); |
| 44 }); |
| 45 return completer.stream; |
| 46 } |
| 47 |
| 48 /// The stream of this completer. |
| 49 /// |
| 50 /// This stream is always a single-subscription stream. |
| 51 /// |
| 52 /// When a source stream is provided, its events will be forwarded to |
| 53 /// listeners on this stream. |
| 54 /// |
| 55 /// The stream can be listened either before or after a source stream |
| 56 /// is set. |
| 57 Stream<T> get stream => _stream; |
| 58 |
| 59 /// Set a stream as the source of events for the [StreamCompleter]'s |
| 60 /// [stream]. |
| 61 /// |
| 62 /// The completer's `stream` will act exactly as [sourceStream]. |
| 63 /// |
| 64 /// If the source stream is set before [stream] is listened to, |
| 65 /// the listen call on [stream] is forwarded directly to [sourceStream]. |
| 66 /// |
| 67 /// If [stream] is listened to before setting the source stream, |
| 68 /// an intermediate subscription is created. It looks like a completely |
| 69 /// normal subscription, and can be paused or canceled, but it won't |
| 70 /// produce any events until a source stream is provided. |
| 71 /// |
| 72 /// If the `stream` subscription is canceled before a source stream is set, |
| 73 /// the source stream will be listened to and immediately canceled again. |
| 74 /// |
| 75 /// Otherwise, when the source stream is then set, |
| 76 /// it is immediately listened to, and its events are forwarded to the |
| 77 /// existing subscription. |
| 78 /// |
| 79 /// Either [setSourceStream] or [setEmpty] may be called at most once. |
| 80 /// Trying to call either of them again will fail. |
| 81 void setSourceStream(Stream<T> sourceStream) { |
| 82 if (_stream._isSourceStreamSet) { |
| 83 throw new StateError("Source stream already set"); |
| 84 } |
| 85 _stream._setSourceStream(sourceStream); |
| 86 } |
| 87 |
| 88 /// Equivalent to setting an empty stream using [setSourceStream]. |
| 89 /// |
| 90 /// Either [setSourceStream] or [setEmpty] may be called at most once. |
| 91 /// Trying to call either of them again will fail. |
| 92 void setEmpty() { |
| 93 if (_stream._isSourceStreamSet) { |
| 94 throw new StateError("Source stream already set"); |
| 95 } |
| 96 _stream._setEmpty(); |
| 97 } |
| 98 } |
| 99 |
| 100 /// Stream completed by [StreamCompleter]. |
| 101 class _CompleterStream<T> extends Stream<T> { |
| 102 /// Controller for an intermediate stream. |
| 103 /// |
| 104 /// Created if the user listens on this stream before the source stream |
| 105 /// is set, or if using [_setEmpty] so there is no source stream. |
| 106 StreamController _controller; |
| 107 |
| 108 /// Source stream for the events provided by this stream. |
| 109 /// |
| 110 /// Set when the completer sets the source stream using [_setSourceStream] |
| 111 /// or [_setEmpty]. |
| 112 Stream _sourceStream; |
| 113 |
| 114 StreamSubscription<T> listen(onData(T data), |
| 115 {Function onError, |
| 116 void onDone(), |
| 117 bool cancelOnError}) { |
| 118 if (_controller == null) { |
| 119 if (_sourceStream != null && !_sourceStream.isBroadcast) { |
| 120 // If the source stream is itself single subscription, |
| 121 // just listen to it directly instead of creating a controller. |
| 122 return _sourceStream.listen(onData, onError: onError, onDone: onDone, |
| 123 cancelOnError: cancelOnError); |
| 124 } |
| 125 _createController(); |
| 126 if (_sourceStream != null) { |
| 127 _linkStreamToController(); |
| 128 } |
| 129 } |
| 130 return _controller.stream.listen(onData, onError: onError, onDone: onDone, |
| 131 cancelOnError: cancelOnError); |
| 132 } |
| 133 |
| 134 /// Whether a source stream has been set. |
| 135 /// |
| 136 /// Used to throw an error if trying to set a source stream twice. |
| 137 bool get _isSourceStreamSet => _sourceStream != null; |
| 138 |
| 139 /// Sets the source stream providing the events for this stream. |
| 140 /// |
| 141 /// If set before the user listens, listen calls will be directed directly |
| 142 /// to the source stream. If the user listenes earlier, and intermediate |
| 143 /// stream is created using a stream controller, and the source stream is |
| 144 /// linked into that stream later. |
| 145 void _setSourceStream(Stream<T> sourceStream) { |
| 146 assert(_sourceStream == null); |
| 147 _sourceStream = sourceStream; |
| 148 if (_controller != null) { |
| 149 // User has already listened, so provide the data through controller. |
| 150 _linkStreamToController(); |
| 151 } |
| 152 } |
| 153 |
| 154 /// Links source stream to controller when both are available. |
| 155 void _linkStreamToController() { |
| 156 assert(_controller != null); |
| 157 assert(_sourceStream != null); |
| 158 _controller.addStream(_sourceStream, cancelOnError: false) |
| 159 .whenComplete(_controller.close); |
| 160 } |
| 161 |
| 162 /// Sets an empty source stream. |
| 163 /// |
| 164 /// Uses [_controller] for the stream, then closes the controller |
| 165 /// immediately. |
| 166 void _setEmpty() { |
| 167 assert(_sourceStream == null); |
| 168 if (_controller == null) { |
| 169 _createController(); |
| 170 } |
| 171 _sourceStream = _controller.stream; // Mark stream as set. |
| 172 _controller.close(); |
| 173 } |
| 174 |
| 175 // Creates the [_controller]. |
| 176 void _createController() { |
| 177 assert(_controller == null); |
| 178 _controller = new StreamController<T>(sync: true); |
| 179 } |
| 180 } |
OLD | NEW |