OLD | NEW |
---|---|
(Empty) | |
1 // Copyright (c) 2017, the Dart project authors. Please see the AUTHORS file | |
2 // for details. All rights reserved. Use of this source code is governed by a | |
3 // BSD-style license that can be found in the LICENSE file. | |
4 | |
5 import "dart:async"; | |
6 import "dart:typed_data"; | |
7 | |
8 /// A stream transformer that collects byte list events into a single list. | |
9 /// | |
10 /// The transformer accumulates lists of bytes, and when the source stream ends, | |
11 /// all the bytes are emitted as a single [Uint8List] event on the resulting | |
12 /// stream. | |
13 /// | |
14 /// If any of the input data are not valid bytes, they will be truncated to | |
15 /// an eight-bit unsigned value in the resulting list. | |
16 /// | |
17 /// Any errors events are forwarded directly to the resulting stream. | |
18 const StreamTransformer<List<int>, Uint8List> byteCollector = | |
19 const StreamTransformer<List<int>, Uint8List>(_collectionTransformer); | |
nweiz
2017/01/25 23:00:40
I'm not sure I see the use of a transformer here.
Lasse Reichstein Nielsen
2017/01/26 09:43:05
It might be a little too speculative :)
I was thin
| |
20 | |
21 /// Collects an asynchronous sequence of byte lists into a single list of bytes. | |
22 /// | |
23 /// If the [source] stream emits an error event, | |
24 /// the collection fails and the returned future completes with the same error. | |
25 /// | |
26 /// If any of the input data are not valid bytes, they will be truncated to | |
27 /// an eight-bit unsigned value in the resulting list. | |
28 /// | |
29 /// Equivalent to `source.transform(byteCollector).first`. | |
30 Future<Uint8List> collectBytes(Stream<List<int>> source) async { | |
31 List<List<int>> byteLists = []; | |
nweiz
2017/01/25 23:00:40
"var byteLists = <List<int>>[]"
Also "var" for as
Lasse Reichstein Nielsen
2017/01/26 09:43:05
Done.
(Even though I was actually deliberately cre
| |
32 int length = 0; | |
33 await for (var list in source) { | |
nweiz
2017/01/25 23:00:40
I'm pretty sure "await for" is still very slow on
Lasse Reichstein Nielsen
2017/01/26 09:43:05
Argh.
On one hand, I'm convinced it will never be
| |
34 byteLists.add(list); | |
35 length += list.length; | |
36 } | |
37 return _collect(length, byteLists); | |
38 } | |
39 | |
40 // Join a lists of bytes with a known total length into a single [Uint8List]. | |
41 Uint8List _collect(int length, List<List<int>> byteLists) { | |
42 var result = new Uint8List(length); | |
43 int i = 0; | |
44 for (var byteList in byteLists) { | |
45 var end = i + byteList.length; | |
46 result.setRange(i, end, byteList); | |
47 i = end; | |
48 } | |
49 return result; | |
50 } | |
51 | |
52 /// Collects byte lists from a stream into a single byte list stream. | |
53 StreamSubscription<Uint8List> _collectionTransformer( | |
54 Stream<List<int>> stream, bool cancelOnError) { | |
55 List<List<int>> byteLists = []; | |
56 int length = 0; | |
57 var controller = new StreamController<Uint8List>(sync: true); | |
58 StreamSubscription sourceSubscription = stream.listen( | |
59 (List<int> data) { | |
60 byteLists.add(data); | |
61 length += data.length; | |
62 }, | |
63 onError: controller.addError, | |
64 onDone: () { | |
65 controller.add(_collect(length, byteLists)); | |
66 controller.close(); | |
67 }); | |
68 controller | |
69 ..onPause = sourceSubscription.pause | |
70 ..onResume = sourceSubscription.resume | |
71 ..onCancel = sourceSubscription.cancel; | |
72 return controller.stream.listen(null, cancelOnError: cancelOnError); | |
73 } | |
OLD | NEW |