OLD | NEW |
1 // Copyright (c) 2012, the Dart project authors. Please see the AUTHORS file | 1 // Copyright (c) 2012, the Dart project authors. Please see the AUTHORS file |
2 // for details. All rights reserved. Use of this source code is governed by a | 2 // for details. All rights reserved. Use of this source code is governed by a |
3 // BSD-style license that can be found in the LICENSE file. | 3 // BSD-style license that can be found in the LICENSE file. |
4 | 4 |
5 library utils; | 5 library utils; |
6 | 6 |
7 import 'dart:async'; | 7 import 'dart:async'; |
8 import 'dart:crypto'; | 8 import 'dart:crypto'; |
9 import 'dart:io'; | 9 import 'dart:io'; |
10 import 'dart:scalarlist'; | 10 import 'dart:scalarlist'; |
(...skipping 154 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
165 // TODO(nweiz): remove this once issue 7785 is fixed. | 165 // TODO(nweiz): remove this once issue 7785 is fixed. |
166 /// Wraps [stream] in a [StreamConsumer] so that [Stream]s can by piped into it | 166 /// Wraps [stream] in a [StreamConsumer] so that [Stream]s can by piped into it |
167 /// using [Stream.pipe]. | 167 /// using [Stream.pipe]. |
168 StreamConsumer<List<int>, dynamic> wrapOutputStream(OutputStream stream) => | 168 StreamConsumer<List<int>, dynamic> wrapOutputStream(OutputStream stream) => |
169 new _OutputStreamConsumer(stream); | 169 new _OutputStreamConsumer(stream); |
170 | 170 |
171 /// A [StreamConsumer] that pipes data into an [OutputStream]. | 171 /// A [StreamConsumer] that pipes data into an [OutputStream]. |
172 class _OutputStreamConsumer implements StreamConsumer<List<int>, dynamic> { | 172 class _OutputStreamConsumer implements StreamConsumer<List<int>, dynamic> { |
173 final OutputStream _outputStream; | 173 final OutputStream _outputStream; |
174 | 174 |
175 _OutputStreamConsumer(this._outputStream) | 175 _OutputStreamConsumer(this._outputStream); |
176 : super(); | |
177 | 176 |
178 Future consume(Stream<List<int>> stream) { | 177 Future consume(Stream<List<int>> stream) { |
179 // TODO(nweiz): we have to manually keep track of whether or not the | 178 // TODO(nweiz): we have to manually keep track of whether or not the |
180 // completer has completed since the output stream could signal an error | 179 // completer has completed since the output stream could signal an error |
181 // after close() has been called but before it has shut down internally. See | 180 // after close() has been called but before it has shut down internally. See |
182 // the following TODO. | 181 // the following TODO. |
183 var completed = false; | 182 var completed = false; |
184 var completer = new Completer(); | 183 var completer = new Completer(); |
185 stream.listen((data) => _outputStream.write(data), onDone: () { | 184 stream.listen((data) { |
186 _outputStream.close(); | 185 // Writing empty data to a closed stream can cause errors. |
187 // TODO(nweiz): wait until _outputStream.onClosed is called once issue | 186 if (data.isEmpty) return; |
188 // 7761 is fixed. | 187 |
189 if (!completed) completer.complete(); | 188 // TODO(nweiz): remove this try/catch when issue 7836 is fixed. |
190 completed = true; | 189 try { |
191 }); | 190 _outputStream.write(data); |
| 191 } catch (e, stack) { |
| 192 if (!completed) completer.completeError(e, stack); |
| 193 completed = true; |
| 194 } |
| 195 }, onDone: () => _outputStream.close()); |
192 | 196 |
193 _outputStream.onError = (e) { | 197 _outputStream.onError = (e) { |
194 if (!completed) completer.completeError(e); | 198 if (!completed) completer.completeError(e); |
195 completed = true; | 199 completed = true; |
196 }; | 200 }; |
197 | 201 |
| 202 _outputStream.onClosed = () { |
| 203 if (!completed) completer.complete(); |
| 204 completed = true; |
| 205 }; |
| 206 |
198 return completer.future; | 207 return completer.future; |
199 } | 208 } |
200 } | 209 } |
201 | 210 |
202 // TODO(nweiz): remove this when issue 7786 is fixed. | 211 // TODO(nweiz): remove this when issue 7786 is fixed. |
203 /// Pipes all data and errors from [stream] into [sink]. When [stream] is done, | 212 /// Pipes all data and errors from [stream] into [sink]. When [stream] is done, |
204 /// [sink] is closed and the returned [Future] is completed. | 213 /// [sink] is closed and the returned [Future] is completed. |
205 Future store(Stream stream, StreamSink sink) { | 214 Future store(Stream stream, StreamSink sink) { |
206 var completer = new Completer(); | 215 var completer = new Completer(); |
207 stream.listen(sink.add, | 216 stream.listen(sink.add, |
(...skipping 85 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
293 /// The return values of all [Future]s are discarded. Any errors will cause the | 302 /// The return values of all [Future]s are discarded. Any errors will cause the |
294 /// iteration to stop and will be piped through the return value. | 303 /// iteration to stop and will be piped through the return value. |
295 Future forEachFuture(Iterable input, Future fn(element)) { | 304 Future forEachFuture(Iterable input, Future fn(element)) { |
296 var iterator = input.iterator; | 305 var iterator = input.iterator; |
297 Future nextElement(_) { | 306 Future nextElement(_) { |
298 if (!iterator.moveNext()) return new Future.immediate(null); | 307 if (!iterator.moveNext()) return new Future.immediate(null); |
299 return fn(iterator.current).then(nextElement); | 308 return fn(iterator.current).then(nextElement); |
300 } | 309 } |
301 return nextElement(null); | 310 return nextElement(null); |
302 } | 311 } |
OLD | NEW |