Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(33)

Side by Side Diff: tests/lib/async/slow_consumer_test.dart

Issue 13680002: StreamConsumer has an addStream and a close functions. (Closed) Base URL: https://dart.googlecode.com/svn/branches/bleeding_edge/dart
Patch Set: Update comments. Created 7 years, 8 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch | Annotate | Revision Log
« no previous file with comments | « tests/lib/async/slow_consumer3_test.dart ('k') | no next file » | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
1 // Copyright (c) 2012, the Dart project authors. Please see the AUTHORS file 1 // Copyright (c) 2012, the Dart project authors. Please see the AUTHORS file
2 // for details. All rights reserved. Use of this source code is governed by a 2 // for details. All rights reserved. Use of this source code is governed by a
3 // BSD-style license that can be found in the LICENSE file. 3 // BSD-style license that can be found in the LICENSE file.
4 4
5 // VMOptions=--old_gen_heap_size=64 5 // VMOptions=--old_gen_heap_size=64
6 6
7 library slow_consumer_test; 7 library slow_consumer_test;
8 8
9 import 'dart:async'; 9 import 'dart:async';
10 import 'dart:isolate'; 10 import 'dart:isolate';
11 11
12 const int KB = 1024; 12 const int KB = 1024;
13 const int MB = KB * KB; 13 const int MB = KB * KB;
14 const int GB = KB * KB * KB; 14 const int GB = KB * KB * KB;
15 15
16 class SlowConsumer extends StreamConsumer { 16 class SlowConsumer extends StreamConsumer {
17 var current = new Future.immediate(0); 17 var current = new Future.immediate(0);
18 final int bytesPerSecond; 18 final int bytesPerSecond;
19 int finalCount;
19 20
20 SlowConsumer(int this.bytesPerSecond); 21 SlowConsumer(int this.bytesPerSecond);
21 22
22 Future consume(Stream stream) { 23 Future consume(Stream stream) {
24 return addStream(stream).then((_) => close());
25 }
26
27 Future addStream(Stream stream) {
23 bool done = false; 28 bool done = false;
24 Completer completer = new Completer(); 29 Completer completer = new Completer();
25 var subscription; 30 var subscription;
26 subscription = stream.listen( 31 subscription = stream.listen(
27 (List<int> data) { 32 (List<int> data) {
28 current = current 33 current = current
29 .then((count) { 34 .then((count) {
30 // Simulated amount of time it takes to handle the data. 35 // Simulated amount of time it takes to handle the data.
31 int ms = data.length * 1000 ~/ bytesPerSecond; 36 int ms = data.length * 1000 ~/ bytesPerSecond;
32 Duration duration = new Duration(milliseconds: ms); 37 Duration duration = new Duration(milliseconds: ms);
33 if (!done) subscription.pause(); 38 if (!done) subscription.pause();
34 return new Future.delayed(duration, () { 39 return new Future.delayed(duration, () {
35 if (!done) subscription.resume(); 40 if (!done) subscription.resume();
36 // Make sure we use data here to keep tracking it. 41 // Make sure we use data here to keep tracking it.
37 return count + data.length; 42 return count + data.length;
38 }); 43 });
39 }); 44 });
40 }, 45 },
41 onDone: () { 46 onDone: () {
42 done = true; 47 done = true;
43 current.then((count) { completer.complete(count); }); 48 current.then((count) {
49 finalCount = count;
50 completer.complete(count);
51 });
44 }); 52 });
45 return completer.future; 53 return completer.future;
46 } 54 }
55
56 Future close() {
57 return new Future.immediate(finalCount);
58 }
47 } 59 }
48 60
49 class DataProvider { 61 class DataProvider {
50 final int chunkSize; 62 final int chunkSize;
51 final int bytesPerSecond; 63 final int bytesPerSecond;
52 int sentCount = 0; 64 int sentCount = 0;
53 int targetCount; 65 int targetCount;
54 StreamController controller; 66 StreamController controller;
55 Timer pendingSend; 67 Timer pendingSend;
56 68
(...skipping 45 matching lines...) Expand 10 before | Expand all | Expand 10 after
102 // file). If the consumer doesn't pause the data-provider it will run out of 114 // file). If the consumer doesn't pause the data-provider it will run out of
103 // heap-space. 115 // heap-space.
104 116
105 new DataProvider(800 * MB, 100 * MB, 1 * MB).stream 117 new DataProvider(800 * MB, 100 * MB, 1 * MB).stream
106 .pipe(new SlowConsumer(200 * MB)) 118 .pipe(new SlowConsumer(200 * MB))
107 .then((count) { 119 .then((count) {
108 port.close(); 120 port.close();
109 Expect.equals(100 * MB, count); 121 Expect.equals(100 * MB, count);
110 }); 122 });
111 } 123 }
OLDNEW
« no previous file with comments | « tests/lib/async/slow_consumer3_test.dart ('k') | no next file » | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698