Index: tests/lib_strong/async/slow_consumer3_test.dart |
diff --git a/tests/lib_strong/async/slow_consumer3_test.dart b/tests/lib_strong/async/slow_consumer3_test.dart |
new file mode 100644 |
index 0000000000000000000000000000000000000000..be12e12ef2418f81753e574e8d1805b4bd0e7018 |
--- /dev/null |
+++ b/tests/lib_strong/async/slow_consumer3_test.dart |
@@ -0,0 +1,94 @@ |
+// Copyright (c) 2012, the Dart project authors. Please see the AUTHORS file |
+// for details. All rights reserved. Use of this source code is governed by a |
+// BSD-style license that can be found in the LICENSE file. |
+ |
+// VMOptions=--old_gen_heap_size=64 |
+ |
+library slow_consumer3_test; |
+ |
+import 'package:async_helper/async_helper.dart'; |
+import "package:expect/expect.dart"; |
+import 'dart:async'; |
+ |
+const int KB = 1024; |
+const int MB = KB * KB; |
+const int GB = KB * KB * KB; |
+ |
+class SlowConsumer extends StreamConsumer { |
+ int receivedCount = 0; |
+ final int bytesPerSecond; |
+ final int bufferSize; |
+ final List bufferedData = []; |
+ int usedBufferSize = 0; |
+ int finalCount; |
+ |
+ SlowConsumer(int this.bytesPerSecond, int this.bufferSize); |
+ |
+ Future consume(Stream stream) { |
+ return addStream(stream).then((_) => close()); |
+ } |
+ |
+ Future addStream(Stream stream) { |
+ Completer result = new Completer(); |
+ var subscription; |
+ subscription = stream.listen( |
+ (List<int> data) { |
+ receivedCount += data.length; |
+ usedBufferSize += data.length; |
+ bufferedData.add(data); |
+ int currentBufferedDataLength = bufferedData.length; |
+ if (usedBufferSize > bufferSize) { |
+ subscription.pause(); |
+ usedBufferSize = 0; |
+ int ms = data.length * 1000 ~/ bytesPerSecond; |
+ Duration duration = new Duration(milliseconds: ms); |
+ new Timer(duration, () { |
+ for (int i = 0; i < currentBufferedDataLength; i++) { |
+ bufferedData[i] = null; |
+ } |
+ subscription.resume(); |
+ }); |
+ } |
+ }, |
+ onDone: () { |
+ finalCount = receivedCount; |
+ result.complete(receivedCount); |
+ }); |
+ return result.future; |
+ } |
+ |
+ Future close() { |
+ return new Future.value(finalCount); |
+ } |
+} |
+ |
+Stream<List> dataGenerator(int bytesTotal, int chunkSize) { |
+ int chunks = bytesTotal ~/ chunkSize; |
+ return new Stream.fromIterable(new Iterable.generate(chunks, (_) { |
+ // This assumes one byte per entry. In practice it will be more. |
+ return new List<int>(chunkSize); |
+ })); |
+} |
+ |
+main() { |
+ asyncStart(); |
+ // The data provider can deliver 800MBs of data as fast as it is |
+ // requested. The data is sent in 0.5MB chunks. The consumer has a buffer of |
+ // 3MB. That is, it can accept a few packages without pausing its input. |
+ // |
+ // Notice that we aren't really counting bytes, but words, since we use normal |
+ // lists where each entry takes up a full word. In 64-bit VMs this will be |
+ // 8 bytes per entry, so the 3*MB buffer is picked to stay below 32 actual |
+ // MiB. |
+ // |
+ // This test is limited to 32MB of heap-space (see VMOptions on top of the |
+ // file). If the consumer doesn't pause the data-provider it will run out of |
+ // heap-space. |
+ |
+ dataGenerator(100 * MB, 512 * KB) |
+ .pipe(new SlowConsumer(200 * MB, 3 * MB)) |
+ .then((count) { |
+ Expect.equals(100 * MB, count); |
+ asyncEnd(); |
+ }); |
+} |