Index: lib/src/byte_collector.dart |
diff --git a/lib/src/byte_collector.dart b/lib/src/byte_collector.dart |
new file mode 100644 |
index 0000000000000000000000000000000000000000..e1157bb2c1a10e0a046198d9e08aa6902804774c |
--- /dev/null |
+++ b/lib/src/byte_collector.dart |
@@ -0,0 +1,73 @@ |
+// Copyright (c) 2017, the Dart project authors. Please see the AUTHORS file |
+// for details. All rights reserved. Use of this source code is governed by a |
+// BSD-style license that can be found in the LICENSE file. |
+ |
+import "dart:async"; |
+import "dart:typed_data"; |
+ |
+/// A stream transformer that collects byte list events into a single list. |
+/// |
+/// The transformer accumulates lists of bytes, and when the source stream ends, |
+/// all the bytes are emitted as a single [Uint8List] event on the resulting |
+/// stream. |
+/// |
+/// If any of the input data are not valid bytes, they will be truncated to |
+/// an eight-bit unsigned value in the resulting list. |
+/// |
+/// Any errors events are forwarded directly to the resulting stream. |
+const StreamTransformer<List<int>, Uint8List> byteCollector = |
+ const StreamTransformer<List<int>, Uint8List>(_collectionTransformer); |
nweiz
2017/01/25 23:00:40
I'm not sure I see the use of a transformer here.
Lasse Reichstein Nielsen
2017/01/26 09:43:05
It might be a little too speculative :)
I was thin
|
+ |
+/// Collects an asynchronous sequence of byte lists into a single list of bytes. |
+/// |
+/// If the [source] stream emits an error event, |
+/// the collection fails and the returned future completes with the same error. |
+/// |
+/// If any of the input data are not valid bytes, they will be truncated to |
+/// an eight-bit unsigned value in the resulting list. |
+/// |
+/// Equivalent to `source.transform(byteCollector).first`. |
+Future<Uint8List> collectBytes(Stream<List<int>> source) async { |
+ List<List<int>> byteLists = []; |
nweiz
2017/01/25 23:00:40
"var byteLists = <List<int>>[]"
Also "var" for as
Lasse Reichstein Nielsen
2017/01/26 09:43:05
Done.
(Even though I was actually deliberately cre
|
+ int length = 0; |
+ await for (var list in source) { |
nweiz
2017/01/25 23:00:40
I'm pretty sure "await for" is still very slow on
Lasse Reichstein Nielsen
2017/01/26 09:43:05
Argh.
On one hand, I'm convinced it will never be
|
+ byteLists.add(list); |
+ length += list.length; |
+ } |
+ return _collect(length, byteLists); |
+} |
+ |
+// Join a lists of bytes with a known total length into a single [Uint8List]. |
+Uint8List _collect(int length, List<List<int>> byteLists) { |
+ var result = new Uint8List(length); |
+ int i = 0; |
+ for (var byteList in byteLists) { |
+ var end = i + byteList.length; |
+ result.setRange(i, end, byteList); |
+ i = end; |
+ } |
+ return result; |
+} |
+ |
+/// Collects byte lists from a stream into a single byte list stream. |
+StreamSubscription<Uint8List> _collectionTransformer( |
+ Stream<List<int>> stream, bool cancelOnError) { |
+ List<List<int>> byteLists = []; |
+ int length = 0; |
+ var controller = new StreamController<Uint8List>(sync: true); |
+ StreamSubscription sourceSubscription = stream.listen( |
+ (List<int> data) { |
+ byteLists.add(data); |
+ length += data.length; |
+ }, |
+ onError: controller.addError, |
+ onDone: () { |
+ controller.add(_collect(length, byteLists)); |
+ controller.close(); |
+ }); |
+ controller |
+ ..onPause = sourceSubscription.pause |
+ ..onResume = sourceSubscription.resume |
+ ..onCancel = sourceSubscription.cancel; |
+ return controller.stream.listen(null, cancelOnError: cancelOnError); |
+} |