OLD | NEW |
1 // Copyright 2014 Google Inc. All Rights Reserved. | 1 // Copyright 2014 Google Inc. All Rights Reserved. |
2 // | 2 // |
3 // Licensed under the Apache License, Version 2.0 (the "License"); | 3 // Licensed under the Apache License, Version 2.0 (the "License"); |
4 // you may not use this file except in compliance with the License. | 4 // you may not use this file except in compliance with the License. |
5 // You may obtain a copy of the License at | 5 // You may obtain a copy of the License at |
6 // | 6 // |
7 // http://www.apache.org/licenses/LICENSE-2.0 | 7 // http://www.apache.org/licenses/LICENSE-2.0 |
8 // | 8 // |
9 // Unless required by applicable law or agreed to in writing, software | 9 // Unless required by applicable law or agreed to in writing, software |
10 // distributed under the License is distributed on an "AS IS" BASIS, | 10 // distributed under the License is distributed on an "AS IS" BASIS, |
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | 11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
12 // See the License for the specific language governing permissions and | 12 // See the License for the specific language governing permissions and |
13 // limitations under the License. | 13 // limitations under the License. |
14 | 14 |
15 part of quiver.async; | 15 part of quiver.async; |
16 | 16 |
17 /** | 17 /// A Stream that will emit the same values as the stream returned by [future] |
18 * A Stream that will emit the same values as the stream returned by [future] | 18 /// once [future] completes. |
19 * once [future] completes. | 19 /// |
20 * | 20 /// If [future] completes to an error, the return value will emit that error |
21 * If [future] completes to an error, the return value will emit that error and | 21 /// and then close. |
22 * then close. | 22 /// |
23 * | 23 /// If [broadcast] is true, this will be a broadcast stream. This assumes that |
24 * If [broadcast] is true, this will be a broadcast stream. This assumes that | 24 /// the stream returned by [future] will be a broadcast stream as well. |
25 * the stream returned by [future] will be a broadcast stream as well. | 25 /// [broadcast] defaults to false. |
26 * [broadcast] defaults to false. | 26 /// |
27 * | 27 /// # Example |
28 * # Example | 28 /// |
29 * | 29 /// This class is useful when you need to retreive some object via a `Future`, |
30 * This class is useful when you need to retreive some object via a `Future`, | 30 /// then return a `Stream` from that object: |
31 * then return a `Stream` from that object: | 31 /// |
32 * | 32 /// var futureOfStream = getResource().then((resource) => resource.stream); |
33 * var futureOfStream = getResource().then((resource) => resource.stream); | 33 /// return new FutureStream(futureOfStream); |
34 * return new FutureStream(futureOfStream); | |
35 */ | |
36 class FutureStream<T> extends Stream<T> { | 34 class FutureStream<T> extends Stream<T> { |
| 35 static T _identity<T>(T t) => t; |
| 36 |
37 Future<Stream<T>> _future; | 37 Future<Stream<T>> _future; |
38 StreamController<T> _controller; | 38 StreamController<T> _controller; |
39 StreamSubscription _subscription; | 39 StreamSubscription _subscription; |
40 | 40 |
41 FutureStream(Future<Stream<T>> future, {bool broadcast: false}) { | 41 FutureStream(Future<Stream<T>> future, {bool broadcast: false}) { |
42 _future = future.catchError((e, stackTrace) { | 42 _future = future.then(_identity, onError: (e, stackTrace) { |
43 // Since [controller] is synchronous, it's likely that emitting an error | 43 // Since [controller] is synchronous, it's likely that emitting an error |
44 // will cause it to be cancelled before we call close. | 44 // will cause it to be cancelled before we call close. |
45 if (_controller != null) { | 45 if (_controller != null) { |
46 _controller.addError(e, stackTrace); | 46 _controller.addError(e, stackTrace); |
47 _controller.close(); | 47 _controller.close(); |
48 } | 48 } |
49 _controller = null; | 49 _controller = null; |
50 }); | 50 }); |
51 | 51 |
52 if (broadcast == true) { | 52 if (broadcast == true) { |
(...skipping 20 matching lines...) Expand all Loading... |
73 } | 73 } |
74 | 74 |
75 StreamSubscription<T> listen(void onData(T event), | 75 StreamSubscription<T> listen(void onData(T event), |
76 {Function onError, void onDone(), bool cancelOnError}) { | 76 {Function onError, void onDone(), bool cancelOnError}) { |
77 return _controller.stream.listen(onData, | 77 return _controller.stream.listen(onData, |
78 onError: onError, onDone: onDone, cancelOnError: cancelOnError); | 78 onError: onError, onDone: onDone, cancelOnError: cancelOnError); |
79 } | 79 } |
80 | 80 |
81 bool get isBroadcast => _controller.stream.isBroadcast; | 81 bool get isBroadcast => _controller.stream.isBroadcast; |
82 } | 82 } |
OLD | NEW |