| OLD | NEW |
| 1 // Copyright (c) 2012, the Dart project authors. Please see the AUTHORS file | 1 // Copyright (c) 2012, the Dart project authors. Please see the AUTHORS file |
| 2 // for details. All rights reserved. Use of this source code is governed by a | 2 // for details. All rights reserved. Use of this source code is governed by a |
| 3 // BSD-style license that can be found in the LICENSE file. | 3 // BSD-style license that can be found in the LICENSE file. |
| 4 | 4 |
| 5 library utils; | 5 library utils; |
| 6 | 6 |
| 7 import 'dart:async'; | 7 import 'dart:async'; |
| 8 import 'dart:crypto'; | 8 import 'dart:crypto'; |
| 9 import 'dart:io'; | 9 import 'dart:io'; |
| 10 import 'dart:scalarlist'; | 10 import 'dart:scalarlist'; |
| (...skipping 131 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 142 | 142 |
| 143 /// Calls [onDone] once [stream] (a single-subscription [Stream]) is finished. | 143 /// Calls [onDone] once [stream] (a single-subscription [Stream]) is finished. |
| 144 /// The return value, also a single-subscription [Stream] should be used in | 144 /// The return value, also a single-subscription [Stream] should be used in |
| 145 /// place of [stream] after calling this method. | 145 /// place of [stream] after calling this method. |
| 146 Stream onDone(Stream stream, void onDone()) { | 146 Stream onDone(Stream stream, void onDone()) { |
| 147 var pair = tee(stream); | 147 var pair = tee(stream); |
| 148 pair.first.listen((_) {}, onError: (_) {}, onDone: onDone); | 148 pair.first.listen((_) {}, onError: (_) {}, onDone: onDone); |
| 149 return pair.last; | 149 return pair.last; |
| 150 } | 150 } |
| 151 | 151 |
| 152 // TODO(nweiz): remove this once issue 7785 is fixed. | |
| 153 /// Wraps [stream] in a single-subscription [ByteStream] that emits the same | |
| 154 /// data. | |
| 155 ByteStream wrapInputStream(InputStream stream) { | |
| 156 if (stream.closed) return emptyStream; | |
| 157 | |
| 158 var controller = new StreamController(); | |
| 159 stream.onClosed = controller.close; | |
| 160 stream.onData = () => controller.add(stream.read()); | |
| 161 stream.onError = (e) => controller.signalError(new AsyncError(e)); | |
| 162 return new ByteStream(controller.stream); | |
| 163 } | |
| 164 | |
| 165 // TODO(nweiz): remove this once issue 7785 is fixed. | |
| 166 /// Wraps [stream] in a [StreamConsumer] so that [Stream]s can by piped into it | |
| 167 /// using [Stream.pipe]. | |
| 168 StreamConsumer<List<int>, dynamic> wrapOutputStream(OutputStream stream) => | |
| 169 new _OutputStreamConsumer(stream); | |
| 170 | |
| 171 /// A [StreamConsumer] that pipes data into an [OutputStream]. | |
| 172 class _OutputStreamConsumer implements StreamConsumer<List<int>, dynamic> { | |
| 173 final OutputStream _outputStream; | |
| 174 | |
| 175 _OutputStreamConsumer(this._outputStream); | |
| 176 | |
| 177 Future consume(Stream<List<int>> stream) { | |
| 178 // TODO(nweiz): we have to manually keep track of whether or not the | |
| 179 // completer has completed since the output stream could signal an error | |
| 180 // after close() has been called but before it has shut down internally. See | |
| 181 // the following TODO. | |
| 182 var completed = false; | |
| 183 var completer = new Completer(); | |
| 184 stream.listen((data) { | |
| 185 // Writing empty data to a closed stream can cause errors. | |
| 186 if (data.isEmpty) return; | |
| 187 | |
| 188 // TODO(nweiz): remove this try/catch when issue 7836 is fixed. | |
| 189 try { | |
| 190 _outputStream.write(data); | |
| 191 } catch (e, stack) { | |
| 192 if (!completed) completer.completeError(e, stack); | |
| 193 completed = true; | |
| 194 } | |
| 195 }, onDone: () => _outputStream.close()); | |
| 196 | |
| 197 _outputStream.onError = (e) { | |
| 198 if (!completed) completer.completeError(e); | |
| 199 completed = true; | |
| 200 }; | |
| 201 | |
| 202 _outputStream.onClosed = () { | |
| 203 if (!completed) completer.complete(); | |
| 204 completed = true; | |
| 205 }; | |
| 206 | |
| 207 return completer.future; | |
| 208 } | |
| 209 } | |
| 210 | |
| 211 // TODO(nweiz): remove this when issue 7786 is fixed. | 152 // TODO(nweiz): remove this when issue 7786 is fixed. |
| 212 /// Pipes all data and errors from [stream] into [sink]. When [stream] is done, | 153 /// Pipes all data and errors from [stream] into [sink]. When [stream] is done, |
| 213 /// [sink] is closed and the returned [Future] is completed. | 154 /// [sink] is closed and the returned [Future] is completed. |
| 214 Future store(Stream stream, StreamSink sink) { | 155 Future store(Stream stream, StreamSink sink) { |
| 215 var completer = new Completer(); | 156 var completer = new Completer(); |
| 216 stream.listen(sink.add, | 157 stream.listen(sink.add, |
| 217 onError: sink.signalError, | 158 onError: sink.signalError, |
| 218 onDone: () { | 159 onDone: () { |
| 219 sink.close(); | 160 sink.close(); |
| 220 completer.complete(); | 161 completer.complete(); |
| (...skipping 94 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
| 315 /// [StreamController]. This exists to work around issue 8310. | 256 /// [StreamController]. This exists to work around issue 8310. |
| 316 Stream wrapStream(Stream stream) { | 257 Stream wrapStream(Stream stream) { |
| 317 var controller = stream.isBroadcast | 258 var controller = stream.isBroadcast |
| 318 ? new StreamController.broadcast() | 259 ? new StreamController.broadcast() |
| 319 : new StreamController(); | 260 : new StreamController(); |
| 320 stream.listen(controller.add, | 261 stream.listen(controller.add, |
| 321 onError: (e) => controller.signalError(e), | 262 onError: (e) => controller.signalError(e), |
| 322 onDone: controller.close); | 263 onDone: controller.close); |
| 323 return controller.stream; | 264 return controller.stream; |
| 324 } | 265 } |
| OLD | NEW |