OLD | NEW |
1 // Copyright (c) 2012, the Dart project authors. Please see the AUTHORS file | 1 // Copyright (c) 2012, the Dart project authors. Please see the AUTHORS file |
2 // for details. All rights reserved. Use of this source code is governed by a | 2 // for details. All rights reserved. Use of this source code is governed by a |
3 // BSD-style license that can be found in the LICENSE file. | 3 // BSD-style license that can be found in the LICENSE file. |
4 | 4 |
5 // VMOptions=--old_gen_heap_size=64 | 5 // VMOptions=--old_gen_heap_size=64 |
6 | 6 |
7 library slow_consumer_test; | 7 library slow_consumer_test; |
8 | 8 |
9 import 'package:async_helper/async_helper.dart'; | 9 import 'package:async_helper/async_helper.dart'; |
10 import "package:expect/expect.dart"; | 10 import "package:expect/expect.dart"; |
(...skipping 11 matching lines...) Expand all Loading... |
22 SlowConsumer(int this.bytesPerSecond); | 22 SlowConsumer(int this.bytesPerSecond); |
23 | 23 |
24 Future consume(Stream stream) { | 24 Future consume(Stream stream) { |
25 return addStream(stream).then((_) => close()); | 25 return addStream(stream).then((_) => close()); |
26 } | 26 } |
27 | 27 |
28 Future addStream(Stream stream) { | 28 Future addStream(Stream stream) { |
29 bool done = false; | 29 bool done = false; |
30 Completer completer = new Completer(); | 30 Completer completer = new Completer(); |
31 var subscription; | 31 var subscription; |
32 subscription = stream.listen( | 32 subscription = stream.listen((List<int> data) { |
33 (List<int> data) { | 33 current = current.then((count) { |
34 current = current | 34 // Simulated amount of time it takes to handle the data. |
35 .then((count) { | 35 int ms = data.length * 1000 ~/ bytesPerSecond; |
36 // Simulated amount of time it takes to handle the data. | 36 Duration duration = new Duration(milliseconds: ms); |
37 int ms = data.length * 1000 ~/ bytesPerSecond; | 37 if (!done) subscription.pause(); |
38 Duration duration = new Duration(milliseconds: ms); | 38 return new Future.delayed(duration, () { |
39 if (!done) subscription.pause(); | 39 if (!done) subscription.resume(); |
40 return new Future.delayed(duration, () { | 40 // Make sure we use data here to keep tracking it. |
41 if (!done) subscription.resume(); | 41 return count + data.length; |
42 // Make sure we use data here to keep tracking it. | |
43 return count + data.length; | |
44 }); | |
45 }); | |
46 }, | |
47 onDone: () { | |
48 done = true; | |
49 current.then((count) { | |
50 finalCount = count; | |
51 completer.complete(count); | |
52 }); | 42 }); |
53 }); | 43 }); |
| 44 }, onDone: () { |
| 45 done = true; |
| 46 current.then((count) { |
| 47 finalCount = count; |
| 48 completer.complete(count); |
| 49 }); |
| 50 }); |
54 return completer.future; | 51 return completer.future; |
55 } | 52 } |
56 | 53 |
57 Future close() { | 54 Future close() { |
58 return new Future.value(finalCount); | 55 return new Future.value(finalCount); |
59 } | 56 } |
60 } | 57 } |
61 | 58 |
62 class DataProvider { | 59 class DataProvider { |
63 final int chunkSize; | 60 final int chunkSize; |
64 final int bytesPerSecond; | 61 final int bytesPerSecond; |
65 int sentCount = 0; | 62 int sentCount = 0; |
66 int targetCount; | 63 int targetCount; |
67 StreamController controller; | 64 StreamController controller; |
68 Timer pendingSend; | 65 Timer pendingSend; |
69 | 66 |
70 DataProvider(int this.bytesPerSecond, int this.targetCount, this.chunkSize) { | 67 DataProvider(int this.bytesPerSecond, int this.targetCount, this.chunkSize) { |
71 controller = new StreamController( | 68 controller = new StreamController( |
72 sync: true, | 69 sync: true, onPause: onPauseStateChange, onResume: onPauseStateChange); |
73 onPause: onPauseStateChange, | |
74 onResume: onPauseStateChange); | |
75 Timer.run(send); | 70 Timer.run(send); |
76 } | 71 } |
77 | 72 |
78 Stream get stream => controller.stream; | 73 Stream get stream => controller.stream; |
79 | 74 |
80 send() { | 75 send() { |
81 if (pendingSend != null) { | 76 if (pendingSend != null) { |
82 pendingSend.cancel(); | 77 pendingSend.cancel(); |
83 pendingSend = null; | 78 pendingSend = null; |
84 } | 79 } |
(...skipping 26 matching lines...) Expand all Loading... |
111 main() { | 106 main() { |
112 asyncStart(); | 107 asyncStart(); |
113 // The data provider can deliver 800MB/s of data. It sends 100MB of data to | 108 // The data provider can deliver 800MB/s of data. It sends 100MB of data to |
114 // the slower consumer who can only read 200MB/s. The data is sent in 1MB | 109 // the slower consumer who can only read 200MB/s. The data is sent in 1MB |
115 // chunks. | 110 // chunks. |
116 // | 111 // |
117 // This test is limited to 64MB of heap-space (see VMOptions on top of the | 112 // This test is limited to 64MB of heap-space (see VMOptions on top of the |
118 // file). If the consumer doesn't pause the data-provider it will run out of | 113 // file). If the consumer doesn't pause the data-provider it will run out of |
119 // heap-space. | 114 // heap-space. |
120 | 115 |
121 new DataProvider(800 * MB, 100 * MB, 1 * MB).stream | 116 new DataProvider(800 * MB, 100 * MB, 1 * MB) |
122 .pipe(new SlowConsumer(200 * MB)) | 117 .stream |
123 .then((count) { | 118 .pipe(new SlowConsumer(200 * MB)) |
124 Expect.equals(100 * MB, count); | 119 .then((count) { |
125 asyncEnd(); | 120 Expect.equals(100 * MB, count); |
126 }); | 121 asyncEnd(); |
| 122 }); |
127 } | 123 } |
OLD | NEW |