Chromium Code Reviews| Index: lib/src/byte_collector.dart |
| diff --git a/lib/src/byte_collector.dart b/lib/src/byte_collector.dart |
| new file mode 100644 |
| index 0000000000000000000000000000000000000000..e1157bb2c1a10e0a046198d9e08aa6902804774c |
| --- /dev/null |
| +++ b/lib/src/byte_collector.dart |
| @@ -0,0 +1,73 @@ |
| +// Copyright (c) 2017, the Dart project authors. Please see the AUTHORS file |
| +// for details. All rights reserved. Use of this source code is governed by a |
| +// BSD-style license that can be found in the LICENSE file. |
| + |
| +import "dart:async"; |
| +import "dart:typed_data"; |
| + |
| +/// A stream transformer that collects byte list events into a single list. |
| +/// |
| +/// The transformer accumulates lists of bytes, and when the source stream ends, |
| +/// all the bytes are emitted as a single [Uint8List] event on the resulting |
| +/// stream. |
| +/// |
| +/// If any of the input data are not valid bytes, they will be truncated to |
| +/// an eight-bit unsigned value in the resulting list. |
| +/// |
| +/// Any errors events are forwarded directly to the resulting stream. |
| +const StreamTransformer<List<int>, Uint8List> byteCollector = |
| + const StreamTransformer<List<int>, Uint8List>(_collectionTransformer); |
|
nweiz
2017/01/25 23:00:40
I'm not sure I see the use of a transformer here.
Lasse Reichstein Nielsen
2017/01/26 09:43:05
It might be a little too speculative :)
I was thin
|
| + |
| +/// Collects an asynchronous sequence of byte lists into a single list of bytes. |
| +/// |
| +/// If the [source] stream emits an error event, |
| +/// the collection fails and the returned future completes with the same error. |
| +/// |
| +/// If any of the input data are not valid bytes, they will be truncated to |
| +/// an eight-bit unsigned value in the resulting list. |
| +/// |
| +/// Equivalent to `source.transform(byteCollector).first`. |
| +Future<Uint8List> collectBytes(Stream<List<int>> source) async { |
| + List<List<int>> byteLists = []; |
|
nweiz
2017/01/25 23:00:40
"var byteLists = <List<int>>[]"
Also "var" for as
Lasse Reichstein Nielsen
2017/01/26 09:43:05
Done.
(Even though I was actually deliberately cre
|
| + int length = 0; |
| + await for (var list in source) { |
|
nweiz
2017/01/25 23:00:40
I'm pretty sure "await for" is still very slow on
Lasse Reichstein Nielsen
2017/01/26 09:43:05
Argh.
On one hand, I'm convinced it will never be
|
| + byteLists.add(list); |
| + length += list.length; |
| + } |
| + return _collect(length, byteLists); |
| +} |
| + |
| +// Join a lists of bytes with a known total length into a single [Uint8List]. |
| +Uint8List _collect(int length, List<List<int>> byteLists) { |
| + var result = new Uint8List(length); |
| + int i = 0; |
| + for (var byteList in byteLists) { |
| + var end = i + byteList.length; |
| + result.setRange(i, end, byteList); |
| + i = end; |
| + } |
| + return result; |
| +} |
| + |
| +/// Collects byte lists from a stream into a single byte list stream. |
| +StreamSubscription<Uint8List> _collectionTransformer( |
| + Stream<List<int>> stream, bool cancelOnError) { |
| + List<List<int>> byteLists = []; |
| + int length = 0; |
| + var controller = new StreamController<Uint8List>(sync: true); |
| + StreamSubscription sourceSubscription = stream.listen( |
| + (List<int> data) { |
| + byteLists.add(data); |
| + length += data.length; |
| + }, |
| + onError: controller.addError, |
| + onDone: () { |
| + controller.add(_collect(length, byteLists)); |
| + controller.close(); |
| + }); |
| + controller |
| + ..onPause = sourceSubscription.pause |
| + ..onResume = sourceSubscription.resume |
| + ..onCancel = sourceSubscription.cancel; |
| + return controller.stream.listen(null, cancelOnError: cancelOnError); |
| +} |