OLD | NEW |
---|---|
(Empty) | |
1 // Copyright (c) 2015, the Dart project authors. Please see the AUTHORS file | |
2 // for details. All rights reserved. Use of this source code is governed by a | |
3 // BSD-style license that can be found in the LICENSE file. | |
4 | |
5 library async.stream_completer; | |
6 | |
7 import "dart:async"; | |
8 | |
9 /// A single-subscription [stream] where the contents are provided later. | |
10 /// | |
11 /// It is generally recommended that you never create a `Future<Stream>` | |
12 /// because you can just directly create a stream that doesn't do anything | |
13 /// until it's ready to do so. | |
14 /// This class can be used to create such a stream. | |
15 /// | |
16 /// The [stream] is a normal stream that you can listen to immediately, | |
17 /// but until either [setSourceStream] or [setEmpty] is called, | |
18 /// the stream won't produce any events. | |
19 /// | |
20 /// The same effect can be achieved by using a [StreamController] | |
21 /// and adding the stream using `addStream` when both | |
22 /// the controller's stream is listened to and the source stream is ready. | |
23 /// This class attempts to shortcut some of the overhead when possible. | |
24 /// For example, if the [stream] is only listened to | |
25 /// after the source stream has been set, | |
26 /// the listen is performed directly on the source stream. | |
27 class StreamCompleter<T> { | |
28 /// The stream doing the actual work, is returned by [stream]. | |
29 final _CompleterStream _stream = new _CompleterStream<T>(); | |
30 | |
31 /// Convert a `Future<Stream>` to a `Stream`. | |
32 /// | |
33 /// This creates a stream using a stream completer, | |
34 /// and sets the source stream to the result of the future when the | |
35 /// future completes. | |
36 /// | |
37 /// If the future completes with an error, the returned stream will | |
38 /// instead contain just that error. | |
39 static Stream fromFuture(Future<Stream> streamFuture) { | |
40 var completer = new StreamCompleter(); | |
41 streamFuture.then(completer.setSourceStream, | |
42 onError: (e, s) { | |
43 completer.setSourceStream(streamFuture.asStream()); | |
44 }); | |
45 return completer.stream; | |
46 } | |
47 | |
48 /// The stream of this completer. | |
49 /// | |
50 /// When a source stream is provided, its events will be forwarded to | |
51 /// listeners on this stream. | |
52 /// | |
53 /// The stream can be listened either before or after a source stream | |
54 /// is set. | |
nweiz
2015/06/30 23:39:48
Document that this is always single-subscription,
Lasse Reichstein Nielsen
2015/07/01 08:24:57
Done.
| |
55 Stream<T> get stream => _stream; | |
56 | |
57 /// Set a stream as the source of events for the [StreamCompleter]'s | |
58 /// [stream]. | |
59 /// | |
60 /// The completer's `stream` will act exactly as [sourceStream]. | |
61 /// | |
62 /// If the source stream is set before [stream] is listened to, | |
63 /// the listen call on [stream] is forwarded directly to [sourceStream]. | |
64 /// | |
65 /// If [stream] is listened to before setting the source stream, | |
66 /// an intermediate subscription is created. It looks like a completely | |
67 /// normal subscription, and can be paused or canceled, but it won't | |
68 /// produce any events until a source stream is provided. | |
69 /// | |
70 /// If the `stream` subscription is canceled before a source stream is set, | |
71 /// the source stream will be listened to and immediately canceled again. | |
72 /// | |
73 /// Otherwise, when the source stream is then set, | |
74 /// it is immediately listened to, and its events are forwarded to the | |
75 /// existing subscription. | |
76 /// | |
77 /// Either [setSourceStream] or [setEmpty] may be called at most once. | |
78 /// Trying to call either of them again will fail. | |
79 void setSourceStream(Stream<T> sourceStream) { | |
80 if (_stream._isSourceStreamSet) { | |
81 throw new StateError("Source stream already set"); | |
82 } | |
83 _stream._setSourceStream(sourceStream); | |
84 } | |
85 | |
86 /// Equivalent to setting an empty stream using [setSourceStream]. | |
87 /// | |
88 /// Either [setSourceStream] or [setEmpty] may be called at most once. | |
89 /// Trying to call either of them again will fail. | |
90 void setEmpty() { | |
91 if (_stream._isSourceStreamSet) { | |
92 throw new StateError("Source stream already set"); | |
93 } | |
94 _stream._setEmpty(); | |
95 } | |
96 } | |
97 | |
98 /// Stream completed by [StreamCompleter]. | |
99 class _CompleterStream<T> extends Stream<T> { | |
100 /// Controller for an intermediate stream. | |
101 /// | |
102 /// Created if the user listens on this stream before the source stream | |
103 /// is set, or if using [_setEmpty] so there is no source stream. | |
104 StreamController _controller; | |
105 | |
106 /// Source stream for the events provided by this stream. | |
107 /// | |
108 /// Set when the completer sets the source stream using [_setSourceStream] | |
109 /// or [_setEmpty]. | |
110 Stream _sourceStream; | |
111 | |
112 /// Completer for future returned by [setSourceStream]. | |
113 /// | |
114 /// The future is completed when the stream has been added or canceled, | |
115 /// and if canceling causes an error, the future is completed with that error. | |
116 final Completer _doneCompleter = new Completer(); | |
nweiz
2015/06/30 23:39:48
It looks like this is never completed.
Lasse Reichstein Nielsen
2015/07/01 08:24:57
True. It should be completed when the source strea
| |
117 | |
118 StreamSubscription<T> listen(onData(T data), | |
119 {Function onError, | |
120 void onDone(), | |
121 bool cancelOnError}) { | |
122 if (_controller == null) { | |
123 if (_sourceStream != null && !_sourceStream.isBroadcast) { | |
124 // If the source stream is itself single subscription, | |
125 // just listen to it directly instead of creating a controller. | |
126 return _sourceStream.listen(onData, onError: onError, onDone: onDone, | |
127 cancelOnError: cancelOnError); | |
128 } | |
129 _createController(); | |
130 if (_sourceStream != null) { | |
131 _linkStreamToController(); | |
132 } | |
133 } | |
134 return _controller.stream.listen(onData, onError: onError, onDone: onDone, | |
135 cancelOnError: cancelOnError); | |
136 } | |
137 | |
138 /// Whether a source stream has been set. | |
139 /// | |
140 /// Used to throw an error if trying to set a source stream twice. | |
141 bool get _isSourceStreamSet => _sourceStream != null; | |
142 | |
143 /// Sets the source stream providing the events for this stream. | |
144 /// | |
145 /// If set before the user listens, listen calls will be directed directly | |
146 /// to the source stream. If the user listenes earlier, and intermediate | |
147 /// stream is created using a stream controller, and the source stream is | |
148 /// linked into that stream later. | |
149 Future _setSourceStream(Stream<T> sourceStream) { | |
nweiz
2015/06/30 23:39:47
It looks like this return value is unused. Should
Lasse Reichstein Nielsen
2015/07/01 08:24:57
I removed the _doneCompleter entirely. If someone
| |
150 assert(_sourceStream == null); | |
151 _sourceStream = sourceStream; | |
152 if (_controller != null) { | |
153 // User has already listened, so provide the data through controller. | |
154 _linkStreamToController(); | |
155 } | |
156 return _doneCompleter.future; | |
157 } | |
158 | |
159 /// Links source stream to controller when both are available. | |
160 void _linkStreamToController() { | |
161 assert(_controller != null); | |
162 assert(_sourceStream != null); | |
163 _controller.addStream(_sourceStream, cancelOnError: false) | |
164 .whenComplete(_controller.close); | |
165 } | |
166 | |
167 /// Sets an empty source stream. | |
168 /// | |
169 /// Uses [_controller] for the stream, then closes the controller | |
170 /// immediately. | |
171 void _setEmpty() { | |
172 assert(_sourceStream == null); | |
173 if (_controller == null) { | |
174 _createController(); | |
175 } | |
176 _sourceStream = _controller.stream; // Mark stream as set. | |
177 _controller.close(); | |
178 } | |
179 | |
180 // Creates the [_controller]. | |
181 void _createController() { | |
182 assert(_controller == null); | |
183 _controller = new StreamController<T>(sync: true); | |
184 } | |
185 } | |
OLD | NEW |