Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(251)

Side by Side Diff: lib/src/byte_collector.dart

Issue 2649233006: Add `byteCollector` stream transformer and `collectBytes` function. (Closed)
Patch Set: Created 3 years, 10 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
(Empty)
1 // Copyright (c) 2017, the Dart project authors. Please see the AUTHORS file
2 // for details. All rights reserved. Use of this source code is governed by a
3 // BSD-style license that can be found in the LICENSE file.
4
5 import "dart:async";
6 import "dart:typed_data";
7
8 /// A stream transformer that collects byte list events into a single list.
9 ///
10 /// The transformer accumulates lists of bytes, and when the source stream ends,
11 /// all the bytes are emitted as a single [Uint8List] event on the resulting
12 /// stream.
13 ///
14 /// If any of the input data are not valid bytes, they will be truncated to
15 /// an eight-bit unsigned value in the resulting list.
16 ///
17 /// Any errors events are forwarded directly to the resulting stream.
18 const StreamTransformer<List<int>, Uint8List> byteCollector =
19 const StreamTransformer<List<int>, Uint8List>(_collectionTransformer);
nweiz 2017/01/25 23:00:40 I'm not sure I see the use of a transformer here.
Lasse Reichstein Nielsen 2017/01/26 09:43:05 It might be a little too speculative :) I was thin
20
21 /// Collects an asynchronous sequence of byte lists into a single list of bytes.
22 ///
23 /// If the [source] stream emits an error event,
24 /// the collection fails and the returned future completes with the same error.
25 ///
26 /// If any of the input data are not valid bytes, they will be truncated to
27 /// an eight-bit unsigned value in the resulting list.
28 ///
29 /// Equivalent to `source.transform(byteCollector).first`.
30 Future<Uint8List> collectBytes(Stream<List<int>> source) async {
31 List<List<int>> byteLists = [];
nweiz 2017/01/25 23:00:40 "var byteLists = <List<int>>[]" Also "var" for as
Lasse Reichstein Nielsen 2017/01/26 09:43:05 Done. (Even though I was actually deliberately cre
32 int length = 0;
33 await for (var list in source) {
nweiz 2017/01/25 23:00:40 I'm pretty sure "await for" is still very slow on
Lasse Reichstein Nielsen 2017/01/26 09:43:05 Argh. On one hand, I'm convinced it will never be
34 byteLists.add(list);
35 length += list.length;
36 }
37 return _collect(length, byteLists);
38 }
39
40 // Join a lists of bytes with a known total length into a single [Uint8List].
41 Uint8List _collect(int length, List<List<int>> byteLists) {
42 var result = new Uint8List(length);
43 int i = 0;
44 for (var byteList in byteLists) {
45 var end = i + byteList.length;
46 result.setRange(i, end, byteList);
47 i = end;
48 }
49 return result;
50 }
51
52 /// Collects byte lists from a stream into a single byte list stream.
53 StreamSubscription<Uint8List> _collectionTransformer(
54 Stream<List<int>> stream, bool cancelOnError) {
55 List<List<int>> byteLists = [];
56 int length = 0;
57 var controller = new StreamController<Uint8List>(sync: true);
58 StreamSubscription sourceSubscription = stream.listen(
59 (List<int> data) {
60 byteLists.add(data);
61 length += data.length;
62 },
63 onError: controller.addError,
64 onDone: () {
65 controller.add(_collect(length, byteLists));
66 controller.close();
67 });
68 controller
69 ..onPause = sourceSubscription.pause
70 ..onResume = sourceSubscription.resume
71 ..onCancel = sourceSubscription.cancel;
72 return controller.stream.listen(null, cancelOnError: cancelOnError);
73 }
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698