Chromium Code Reviews| OLD | NEW |
|---|---|
| (Empty) | |
| 1 // Copyright (c) 2017, the Dart project authors. Please see the AUTHORS file | |
| 2 // for details. All rights reserved. Use of this source code is governed by a | |
| 3 // BSD-style license that can be found in the LICENSE file. | |
| 4 | |
| 5 import "dart:async"; | |
| 6 import "dart:typed_data"; | |
| 7 | |
| 8 /// A stream transformer that collects byte list events into a single list. | |
| 9 /// | |
| 10 /// The transformer accumulates lists of bytes, and when the source stream ends, | |
| 11 /// all the bytes are emitted as a single [Uint8List] event on the resulting | |
| 12 /// stream. | |
| 13 /// | |
| 14 /// If any of the input data are not valid bytes, they will be truncated to | |
| 15 /// an eight-bit unsigned value in the resulting list. | |
| 16 /// | |
| 17 /// Any errors events are forwarded directly to the resulting stream. | |
| 18 const StreamTransformer<List<int>, Uint8List> byteCollector = | |
| 19 const StreamTransformer<List<int>, Uint8List>(_collectionTransformer); | |
|
nweiz
2017/01/25 23:00:40
I'm not sure I see the use of a transformer here.
Lasse Reichstein Nielsen
2017/01/26 09:43:05
It might be a little too speculative :)
I was thin
| |
| 20 | |
| 21 /// Collects an asynchronous sequence of byte lists into a single list of bytes. | |
| 22 /// | |
| 23 /// If the [source] stream emits an error event, | |
| 24 /// the collection fails and the returned future completes with the same error. | |
| 25 /// | |
| 26 /// If any of the input data are not valid bytes, they will be truncated to | |
| 27 /// an eight-bit unsigned value in the resulting list. | |
| 28 /// | |
| 29 /// Equivalent to `source.transform(byteCollector).first`. | |
| 30 Future<Uint8List> collectBytes(Stream<List<int>> source) async { | |
| 31 List<List<int>> byteLists = []; | |
|
nweiz
2017/01/25 23:00:40
"var byteLists = <List<int>>[]"
Also "var" for as
Lasse Reichstein Nielsen
2017/01/26 09:43:05
Done.
(Even though I was actually deliberately cre
| |
| 32 int length = 0; | |
| 33 await for (var list in source) { | |
|
nweiz
2017/01/25 23:00:40
I'm pretty sure "await for" is still very slow on
Lasse Reichstein Nielsen
2017/01/26 09:43:05
Argh.
On one hand, I'm convinced it will never be
| |
| 34 byteLists.add(list); | |
| 35 length += list.length; | |
| 36 } | |
| 37 return _collect(length, byteLists); | |
| 38 } | |
| 39 | |
| 40 // Join a lists of bytes with a known total length into a single [Uint8List]. | |
| 41 Uint8List _collect(int length, List<List<int>> byteLists) { | |
| 42 var result = new Uint8List(length); | |
| 43 int i = 0; | |
| 44 for (var byteList in byteLists) { | |
| 45 var end = i + byteList.length; | |
| 46 result.setRange(i, end, byteList); | |
| 47 i = end; | |
| 48 } | |
| 49 return result; | |
| 50 } | |
| 51 | |
| 52 /// Collects byte lists from a stream into a single byte list stream. | |
| 53 StreamSubscription<Uint8List> _collectionTransformer( | |
| 54 Stream<List<int>> stream, bool cancelOnError) { | |
| 55 List<List<int>> byteLists = []; | |
| 56 int length = 0; | |
| 57 var controller = new StreamController<Uint8List>(sync: true); | |
| 58 StreamSubscription sourceSubscription = stream.listen( | |
| 59 (List<int> data) { | |
| 60 byteLists.add(data); | |
| 61 length += data.length; | |
| 62 }, | |
| 63 onError: controller.addError, | |
| 64 onDone: () { | |
| 65 controller.add(_collect(length, byteLists)); | |
| 66 controller.close(); | |
| 67 }); | |
| 68 controller | |
| 69 ..onPause = sourceSubscription.pause | |
| 70 ..onResume = sourceSubscription.resume | |
| 71 ..onCancel = sourceSubscription.cancel; | |
| 72 return controller.stream.listen(null, cancelOnError: cancelOnError); | |
| 73 } | |
| OLD | NEW |