Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(56)

Side by Side Diff: tests/lib/async/slow_consumer_test.dart

Issue 11783009: Big merge from experimental to bleeding edge. (Closed) Base URL: https://dart.googlecode.com/svn/branches/bleeding_edge/dart
Patch Set: Created 7 years, 11 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch | Annotate | Revision Log
OLDNEW
(Empty)
1 // Copyright (c) 2012, the Dart project authors. Please see the AUTHORS file
2 // for details. All rights reserved. Use of this source code is governed by a
3 // BSD-style license that can be found in the LICENSE file.
4
5 // VMOptions=--old_gen_heap_size=32
6
7 import 'dart:async';
8 import 'dart:isolate';
9
10 const int KB = 1024;
11 const int MB = KB * KB;
12 const int GB = KB * KB * KB;
13
14 class SlowConsumer extends StreamConsumer {
15 var current = new Future.immediate(0);
16 final int bytesPerSecond;
17
18 SlowConsumer(int this.bytesPerSecond);
19
20 Future consume(Stream stream) {
21 Completer completer = new Completer();
22 var subscription;
23 subscription = stream.listen(
24 (List<int> data) {
25 current = current
26 .then((count) {
27 // Simulated amount of time it takes to handle the data.
28 int ms = data.length * 1000 ~/ bytesPerSecond;
29 subscription.pause();
30 return new Future.delayed(ms, () {
31 subscription.resume();
32 // Make sure we use data here to keep tracking it.
33 return count + data.length;
34 });
35 });
36 },
37 onDone: () { current.then((count) { completer.complete(count); }); });
38 return completer.future;
39 }
40 }
41
42 class DataProvider extends StreamController {
43 final int chunkSize;
44 final int bytesPerSecond;
45 int sentCount = 0;
46 int targetCount;
47
48 DataProvider(int this.bytesPerSecond, int this.targetCount, this.chunkSize) {
49 new Timer(0, (_) => send());
50 }
51
52 send() {
53 if (isPaused) return;
54 if (sentCount == targetCount) {
55 close();
56 return;
57 }
58 int listSize = chunkSize;
59 sentCount += listSize;
60 if (sentCount > targetCount) {
61 listSize -= sentCount - targetCount;
62 sentCount = targetCount;
63 }
64 add(new List.fixedLength(listSize));
65 int ms = listSize * 1000 ~/ bytesPerSecond;
66 if (!isPaused) new Timer(ms, (_) => send());
67 }
68
69 onPauseStateChange() {
70 // We don't care if we just unpaused or paused. In either case we just
71 // call send which will test it for us.
72 send();
73 }
74 }
75
76 main() {
77 var port = new ReceivePort();
78 // The data provider can deliver 800MB/s of data. It sends 100MB of data to
79 // the slower consumer who can only read 200MB/s. The data is sent in 1MB
80 // chunks.
81 //
82 // This test is limited to 32MB of heap-space (see VMOptions on top of the
83 // file). If the consumer doesn't pause the data-provider it will run out of
84 // heap-space.
85
86 new DataProvider(800 * MB, 100 * MB, 1 * MB)
87 .pipe(new SlowConsumer(200 * MB))
88 .then((count) {
89 port.close();
90 Expect.equals(100 * MB, count);
91 });
92 }
OLDNEW
« no previous file with comments | « tests/lib/async/slow_consumer2_test.dart ('k') | tests/lib/async/stream_controller_async_test.dart » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698