Index: tests/lib/async/slow_consumer2_test.dart |
diff --git a/tests/lib/async/slow_consumer2_test.dart b/tests/lib/async/slow_consumer2_test.dart |
new file mode 100644 |
index 0000000000000000000000000000000000000000..68f5b3c53d530c27eca13f9e69241e4d0cad123c |
--- /dev/null |
+++ b/tests/lib/async/slow_consumer2_test.dart |
@@ -0,0 +1,100 @@ |
+// Copyright (c) 2012, the Dart project authors. Please see the AUTHORS file |
+// for details. All rights reserved. Use of this source code is governed by a |
+// BSD-style license that can be found in the LICENSE file. |
+ |
+// VMOptions=--old_gen_heap_size=32 |
+ |
+import 'dart:async'; |
+import 'dart:isolate'; |
+ |
+const int KB = 1024; |
+const int MB = KB * KB; |
+const int GB = KB * KB * KB; |
+ |
+class SlowConsumer extends StreamConsumer { |
+ int receivedCount = 0; |
+ final int bytesPerSecond; |
+ final int bufferSize; |
+ final List bufferedData = []; |
+ int usedBufferSize = 0; |
+ |
+ SlowConsumer(int this.bytesPerSecond, int this.bufferSize); |
+ |
+ Future consume(Stream stream) { |
+ Completer result = new Completer(); |
+ var subscription; |
+ subscription = stream.listen( |
+ (List<int> data) { |
+ receivedCount += data.length; |
+ usedBufferSize += data.length; |
+ bufferedData.add(data); |
+ int currentBufferedDataLength = bufferedData.length; |
+ if (usedBufferSize > bufferSize) { |
+ subscription.pause(); |
+ usedBufferSize = 0; |
+ int ms = data.length * 1000 ~/ bytesPerSecond; |
+ new Timer(ms, (_) { |
+ for (int i = 0; i < currentBufferedDataLength; i++) { |
+ bufferedData[i] = null; |
+ } |
+ subscription.resume(); |
+ }); |
+ } |
+ }, |
+ onDone: () { result.complete(receivedCount); }); |
+ return result.future; |
+ } |
+} |
+ |
+class DataProvider extends StreamController { |
+ final int chunkSize; |
+ final int bytesPerSecond; |
+ int sentCount = 0; |
+ int targetCount; |
+ |
+ DataProvider(int this.bytesPerSecond, int this.targetCount, this.chunkSize) { |
+ new Timer(0, (_) => send()); |
+ } |
+ |
+ send() { |
+ if (isPaused) return; |
+ if (sentCount == targetCount) { |
+ close(); |
+ return; |
+ } |
+ int listSize = chunkSize; |
+ sentCount += listSize; |
+ if (sentCount > targetCount) { |
+ listSize -= sentCount - targetCount; |
+ sentCount = targetCount; |
+ } |
+ add(new List.fixedLength(listSize)); |
+ int ms = listSize * 1000 ~/ bytesPerSecond; |
+ if (!isPaused) new Timer(ms, (_) => send()); |
+ } |
+ |
+ onPauseStateChange() { |
+ // We don't care if we just unpaused or paused. In either case we just |
+ // call send which will test it for us. |
+ send(); |
+ } |
+} |
+ |
+main() { |
+ var port = new ReceivePort(); |
+ // The data provider can deliver 800MB/s of data. It sends 100MB of data to |
+ // the slower consumer who can only read 200MB/s. The data is sent in 1MB |
+ // chunks. The consumer has a buffer of 5MB. That is, it can accept a few |
+ // packages without pausing its input. |
+ // |
+ // This test is limited to 32MB of heap-space (see VMOptions on top of the |
+ // file). If the consumer doesn't pause the data-provider it will run out of |
+ // heap-space. |
+ |
+ new DataProvider(800 * MB, 100 * MB, 1 * MB) |
+ .pipe(new SlowConsumer(200 * MB, 5 * MB)) |
+ .then((count) { |
+ port.close(); |
+ Expect.equals(100 * MB, count); |
+ }); |
+} |