Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(165)

Side by Side Diff: tests/lib/async/slow_consumer2_test.dart

Issue 11783009: Big merge from experimental to bleeding edge. (Closed) Base URL: https://dart.googlecode.com/svn/branches/bleeding_edge/dart
Patch Set: Created 7 years, 11 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch | Annotate | Revision Log
« no previous file with comments | « tests/lib/async/merge_stream_test.dart ('k') | tests/lib/async/slow_consumer_test.dart » ('j') | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
(Empty)
1 // Copyright (c) 2012, the Dart project authors. Please see the AUTHORS file
2 // for details. All rights reserved. Use of this source code is governed by a
3 // BSD-style license that can be found in the LICENSE file.
4
5 // VMOptions=--old_gen_heap_size=32
6
7 import 'dart:async';
8 import 'dart:isolate';
9
10 const int KB = 1024;
11 const int MB = KB * KB;
12 const int GB = KB * KB * KB;
13
14 class SlowConsumer extends StreamConsumer {
15 int receivedCount = 0;
16 final int bytesPerSecond;
17 final int bufferSize;
18 final List bufferedData = [];
19 int usedBufferSize = 0;
20
21 SlowConsumer(int this.bytesPerSecond, int this.bufferSize);
22
23 Future consume(Stream stream) {
24 Completer result = new Completer();
25 var subscription;
26 subscription = stream.listen(
27 (List<int> data) {
28 receivedCount += data.length;
29 usedBufferSize += data.length;
30 bufferedData.add(data);
31 int currentBufferedDataLength = bufferedData.length;
32 if (usedBufferSize > bufferSize) {
33 subscription.pause();
34 usedBufferSize = 0;
35 int ms = data.length * 1000 ~/ bytesPerSecond;
36 new Timer(ms, (_) {
37 for (int i = 0; i < currentBufferedDataLength; i++) {
38 bufferedData[i] = null;
39 }
40 subscription.resume();
41 });
42 }
43 },
44 onDone: () { result.complete(receivedCount); });
45 return result.future;
46 }
47 }
48
49 class DataProvider extends StreamController {
50 final int chunkSize;
51 final int bytesPerSecond;
52 int sentCount = 0;
53 int targetCount;
54
55 DataProvider(int this.bytesPerSecond, int this.targetCount, this.chunkSize) {
56 new Timer(0, (_) => send());
57 }
58
59 send() {
60 if (isPaused) return;
61 if (sentCount == targetCount) {
62 close();
63 return;
64 }
65 int listSize = chunkSize;
66 sentCount += listSize;
67 if (sentCount > targetCount) {
68 listSize -= sentCount - targetCount;
69 sentCount = targetCount;
70 }
71 add(new List.fixedLength(listSize));
72 int ms = listSize * 1000 ~/ bytesPerSecond;
73 if (!isPaused) new Timer(ms, (_) => send());
74 }
75
76 onPauseStateChange() {
77 // We don't care if we just unpaused or paused. In either case we just
78 // call send which will test it for us.
79 send();
80 }
81 }
82
83 main() {
84 var port = new ReceivePort();
85 // The data provider can deliver 800MB/s of data. It sends 100MB of data to
86 // the slower consumer who can only read 200MB/s. The data is sent in 1MB
87 // chunks. The consumer has a buffer of 5MB. That is, it can accept a few
88 // packages without pausing its input.
89 //
90 // This test is limited to 32MB of heap-space (see VMOptions on top of the
91 // file). If the consumer doesn't pause the data-provider it will run out of
92 // heap-space.
93
94 new DataProvider(800 * MB, 100 * MB, 1 * MB)
95 .pipe(new SlowConsumer(200 * MB, 5 * MB))
96 .then((count) {
97 port.close();
98 Expect.equals(100 * MB, count);
99 });
100 }
OLDNEW
« no previous file with comments | « tests/lib/async/merge_stream_test.dart ('k') | tests/lib/async/slow_consumer_test.dart » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698