Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(112)

Side by Side Diff: tests/lib/async/slow_consumer2_test.dart

Issue 11740027: Rename unsubscribe to cancel. (Closed) Base URL: https://dart.googlecode.com/svn/experimental/lib_v2/dart
Patch Set: Fix error message. Created 7 years, 11 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch | Annotate | Revision Log
OLDNEW
1 // Copyright (c) 2012, the Dart project authors. Please see the AUTHORS file 1 // Copyright (c) 2012, the Dart project authors. Please see the AUTHORS file
2 // for details. All rights reserved. Use of this source code is governed by a 2 // for details. All rights reserved. Use of this source code is governed by a
3 // BSD-style license that can be found in the LICENSE file. 3 // BSD-style license that can be found in the LICENSE file.
4 4
5 // VMOptions=--old_gen_heap_size=32 5 // VMOptions=--old_gen_heap_size=32
6 6
7 import 'dart:async'; 7 import 'dart:async';
8 import 'dart:isolate'; 8 import 'dart:isolate';
9 9
10 const int KB = 1024; 10 const int KB = 1024;
11 const int MB = KB * KB; 11 const int MB = KB * KB;
12 const int GB = KB * KB * KB; 12 const int GB = KB * KB * KB;
13 13
14 class SlowConsumer extends StreamPipe { 14 class SlowConsumer extends StreamPipe {
15 int receivedCount = 0; 15 int receivedCount = 0;
16 final int bytesPerSecond; 16 final int bytesPerSecond;
17 final int bufferSize; 17 final int bufferSize;
18 final List bufferedData = []; 18 final List bufferedData = [];
19 int usedBufferSize = 0; 19 int usedBufferSize = 0;
20 20
21 SlowConsumer(int this.bytesPerSecond, int this.bufferSize); 21 SlowConsumer(int this.bytesPerSecond, int this.bufferSize);
22 22
23 Future bind(Stream stream) { 23 Future bind(Stream stream) {
24 Completer result = new Completer(); 24 Completer result = new Completer();
25 var subscription; 25 var subscription;
26 subscription = stream.subscribe( 26 subscription = stream.listen(
27 onData: (List<int> data) { 27 (List<int> data) {
28 receivedCount += data.length; 28 receivedCount += data.length;
29 usedBufferSize += data.length; 29 usedBufferSize += data.length;
30 bufferedData.add(data); 30 bufferedData.add(data);
31 int currentBufferedDataLength = bufferedData.length; 31 int currentBufferedDataLength = bufferedData.length;
32 if (usedBufferSize > bufferSize) { 32 if (usedBufferSize > bufferSize) {
33 subscription.pause(); 33 subscription.pause();
34 usedBufferSize = 0; 34 usedBufferSize = 0;
35 int ms = data.length * 1000 ~/ bytesPerSecond; 35 int ms = data.length * 1000 ~/ bytesPerSecond;
36 new Timer(ms, (_) { 36 new Timer(ms, (_) {
37 for (int i = 0; i < currentBufferedDataLength; i++) { 37 for (int i = 0; i < currentBufferedDataLength; i++) {
(...skipping 53 matching lines...) Expand 10 before | Expand all | Expand 10 after
91 // file). If the consumer doesn't pause the data-provider it will run out of 91 // file). If the consumer doesn't pause the data-provider it will run out of
92 // heap-space. 92 // heap-space.
93 93
94 new DataProvider(800 * MB, 100 * MB, 1 * MB) 94 new DataProvider(800 * MB, 100 * MB, 1 * MB)
95 .pipe(new SlowConsumer(200 * MB, 5 * MB)) 95 .pipe(new SlowConsumer(200 * MB, 5 * MB))
96 .then((count) { 96 .then((count) {
97 port.close(); 97 port.close();
98 Expect.equals(100 * MB, count); 98 Expect.equals(100 * MB, count);
99 }); 99 });
100 } 100 }
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698