| Index: tests/lib_strong/async/slow_consumer_test.dart
|
| diff --git a/tests/lib_strong/async/slow_consumer_test.dart b/tests/lib_strong/async/slow_consumer_test.dart
|
| deleted file mode 100644
|
| index 9cd3b9404ebbe91226962e3c41269c3626b9b41d..0000000000000000000000000000000000000000
|
| --- a/tests/lib_strong/async/slow_consumer_test.dart
|
| +++ /dev/null
|
| @@ -1,123 +0,0 @@
|
| -// Copyright (c) 2012, the Dart project authors. Please see the AUTHORS file
|
| -// for details. All rights reserved. Use of this source code is governed by a
|
| -// BSD-style license that can be found in the LICENSE file.
|
| -
|
| -// VMOptions=--old_gen_heap_size=64
|
| -
|
| -library slow_consumer_test;
|
| -
|
| -import 'package:async_helper/async_helper.dart';
|
| -import "package:expect/expect.dart";
|
| -import 'dart:async';
|
| -
|
| -const int KB = 1024;
|
| -const int MB = KB * KB;
|
| -const int GB = KB * KB * KB;
|
| -
|
| -class SlowConsumer extends StreamConsumer {
|
| - var current = new Future.value(0);
|
| - final int bytesPerSecond;
|
| - int finalCount;
|
| -
|
| - SlowConsumer(int this.bytesPerSecond);
|
| -
|
| - Future consume(Stream stream) {
|
| - return addStream(stream).then((_) => close());
|
| - }
|
| -
|
| - Future addStream(Stream stream) {
|
| - bool done = false;
|
| - Completer completer = new Completer();
|
| - var subscription;
|
| - subscription = stream.listen((List<int> data) {
|
| - current = current.then((count) {
|
| - // Simulated amount of time it takes to handle the data.
|
| - int ms = data.length * 1000 ~/ bytesPerSecond;
|
| - Duration duration = new Duration(milliseconds: ms);
|
| - if (!done) subscription.pause();
|
| - return new Future.delayed(duration, () {
|
| - if (!done) subscription.resume();
|
| - // Make sure we use data here to keep tracking it.
|
| - return count + data.length;
|
| - });
|
| - });
|
| - }, onDone: () {
|
| - done = true;
|
| - current.then((count) {
|
| - finalCount = count;
|
| - completer.complete(count);
|
| - });
|
| - });
|
| - return completer.future;
|
| - }
|
| -
|
| - Future close() {
|
| - return new Future.value(finalCount);
|
| - }
|
| -}
|
| -
|
| -class DataProvider {
|
| - final int chunkSize;
|
| - final int bytesPerSecond;
|
| - int sentCount = 0;
|
| - int targetCount;
|
| - StreamController controller;
|
| - Timer pendingSend;
|
| -
|
| - DataProvider(int this.bytesPerSecond, int this.targetCount, this.chunkSize) {
|
| - controller = new StreamController(
|
| - sync: true, onPause: onPauseStateChange, onResume: onPauseStateChange);
|
| - Timer.run(send);
|
| - }
|
| -
|
| - Stream get stream => controller.stream;
|
| -
|
| - send() {
|
| - if (pendingSend != null) {
|
| - pendingSend.cancel();
|
| - pendingSend = null;
|
| - }
|
| - if (controller.isPaused) return;
|
| - if (sentCount == targetCount) {
|
| - controller.close();
|
| - return;
|
| - }
|
| - int listSize = chunkSize;
|
| - sentCount += listSize;
|
| - if (sentCount > targetCount) {
|
| - listSize -= sentCount - targetCount;
|
| - sentCount = targetCount;
|
| - }
|
| - controller.add(new List(listSize));
|
| - int ms = listSize * 1000 ~/ bytesPerSecond;
|
| - Duration duration = new Duration(milliseconds: ms);
|
| - if (!controller.isPaused) {
|
| - pendingSend = new Timer(duration, send);
|
| - }
|
| - }
|
| -
|
| - onPauseStateChange() {
|
| - // We don't care if we just unpaused or paused. In either case we just
|
| - // call send which will test it for us.
|
| - send();
|
| - }
|
| -}
|
| -
|
| -main() {
|
| - asyncStart();
|
| - // The data provider can deliver 800MB/s of data. It sends 100MB of data to
|
| - // the slower consumer who can only read 200MB/s. The data is sent in 1MB
|
| - // chunks.
|
| - //
|
| - // This test is limited to 64MB of heap-space (see VMOptions on top of the
|
| - // file). If the consumer doesn't pause the data-provider it will run out of
|
| - // heap-space.
|
| -
|
| - new DataProvider(800 * MB, 100 * MB, 1 * MB)
|
| - .stream
|
| - .pipe(new SlowConsumer(200 * MB))
|
| - .then((count) {
|
| - Expect.equals(100 * MB, count);
|
| - asyncEnd();
|
| - });
|
| -}
|
|
|