OLD | NEW |
---|---|
1 // Copyright (c) 2013, the Dart project authors. Please see the AUTHORS file | 1 // Copyright (c) 2013, the Dart project authors. Please see the AUTHORS file |
2 // for details. All rights reserved. Use of this source code is governed by a | 2 // for details. All rights reserved. Use of this source code is governed by a |
3 // BSD-style license that can be found in the LICENSE file. | 3 // BSD-style license that can be found in the LICENSE file. |
4 | 4 |
5 part of dart.async; | 5 part of dart.async; |
6 | 6 |
7 // ------------------------------------------------------------------- | 7 // ------------------------------------------------------------------- |
8 // Core Stream types | 8 // Core Stream types |
9 // ------------------------------------------------------------------- | 9 // ------------------------------------------------------------------- |
10 | 10 |
(...skipping 95 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
106 controller._closeUnchecked(); | 106 controller._closeUnchecked(); |
107 }, | 107 }, |
108 onError: (error, stackTrace) { | 108 onError: (error, stackTrace) { |
109 controller._addError(error, stackTrace); | 109 controller._addError(error, stackTrace); |
110 controller._closeUnchecked(); | 110 controller._closeUnchecked(); |
111 }); | 111 }); |
112 return controller.stream; | 112 return controller.stream; |
113 } | 113 } |
114 | 114 |
115 /** | 115 /** |
116 * Create a stream from a group of futures. | |
117 * | |
118 * The stream reports the results of the futures on the stream in the order | |
119 * in which the futures complete. | |
120 * | |
121 * If some futures have completed before calling `Stream.fromFutures`, | |
122 * their result will be output on the created stream in some unspecified | |
123 * order. | |
124 * | |
125 * When all futures have completed, the stream is closed. | |
126 * | |
127 * If no future is passed, the stream closes as soon as possible. | |
128 */ | |
129 factory Stream.fromFutures(Iterable<Future<T>> futures) { | |
130 var controller; | |
131 controller = new StreamController<T>(sync: true); | |
floitsch
2016/01/07 13:57:13
Merge these two lines?
Lasse Reichstein Nielsen
2016/01/08 09:18:48
Done.
| |
132 int count = 0; | |
133 var onValue = (value) { | |
134 if (!controller.isClosed) { | |
135 controller._add(value); | |
136 if (--count == 0) controller._closeUnchecked(); | |
137 } | |
138 }; | |
139 var onError = (error, stack) { | |
140 if (!controller.isClosed) { | |
141 controller._addError(error, stack); | |
142 if (--count == 0) controller._closeUnchecked(); | |
143 } | |
144 }; | |
145 // The futures are already running, so start listening to them immediately | |
146 // (instead of waiting for the stream to be listened on). | |
147 // If we wait, we might not catch errors in the futures in time. | |
148 for (var future in futures) { | |
149 count++; | |
150 future.then(onValue, onError: onError); | |
151 } | |
152 // Use schedule microtask since controller is sync. | |
153 if (count == 0) scheduleMicrotask(controller.close); | |
154 return controller.stream; | |
155 } | |
156 | |
157 /** | |
116 * Creates a single-subscription stream that gets its data from [data]. | 158 * Creates a single-subscription stream that gets its data from [data]. |
117 * | 159 * |
118 * The iterable is iterated when the stream receives a listener, and stops | 160 * The iterable is iterated when the stream receives a listener, and stops |
119 * iterating if the listener cancels the subscription. | 161 * iterating if the listener cancels the subscription. |
120 * | 162 * |
121 * If iterating [data] throws an error, the stream ends immediately with | 163 * If iterating [data] throws an error, the stream ends immediately with |
122 * that error. No done event will be sent (iteration is not complete), but no | 164 * that error. No done event will be sent (iteration is not complete), but no |
123 * further data events will be generated either, since iteration cannot | 165 * further data events will be generated either, since iteration cannot |
124 * continue. | 166 * continue. |
125 */ | 167 */ |
(...skipping 1625 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
1751 class _ControllerEventSinkWrapper<T> implements EventSink<T> { | 1793 class _ControllerEventSinkWrapper<T> implements EventSink<T> { |
1752 EventSink _sink; | 1794 EventSink _sink; |
1753 _ControllerEventSinkWrapper(this._sink); | 1795 _ControllerEventSinkWrapper(this._sink); |
1754 | 1796 |
1755 void add(T data) { _sink.add(data); } | 1797 void add(T data) { _sink.add(data); } |
1756 void addError(error, [StackTrace stackTrace]) { | 1798 void addError(error, [StackTrace stackTrace]) { |
1757 _sink.addError(error, stackTrace); | 1799 _sink.addError(error, stackTrace); |
1758 } | 1800 } |
1759 void close() { _sink.close(); } | 1801 void close() { _sink.close(); } |
1760 } | 1802 } |
OLD | NEW |