Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(242)

Side by Side Diff: packages/quiver/lib/src/async/stream_buffer.dart

Issue 2989763002: Update charted to 0.4.8 and roll (Closed)
Patch Set: Removed Cutch from list of reviewers Created 3 years, 4 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
OLDNEW
1 // Copyright 2014 Google Inc. All Rights Reserved. 1 // Copyright 2014 Google Inc. All Rights Reserved.
2 // 2 //
3 // Licensed under the Apache License, Version 2.0 (the "License"); 3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License. 4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at 5 // You may obtain a copy of the License at
6 // 6 //
7 // http://www.apache.org/licenses/LICENSE-2.0 7 // http://www.apache.org/licenses/LICENSE-2.0
8 // 8 //
9 // Unless required by applicable law or agreed to in writing, software 9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS, 10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and 12 // See the License for the specific language governing permissions and
13 // limitations under the License. 13 // limitations under the License.
14 14
15 part of quiver.streams; 15 part of quiver.async;
16 16
17 /** 17 /// Underflow errors happen when the socket feeding a buffer is finished while
18 * Underflow errors happen when the socket feeding a buffer is finished while 18 /// there are still blocked readers. Each reader will complete with this error.
19 * there are still blocked readers. Each reader will complete with this error.
20 */
21 class UnderflowError extends Error { 19 class UnderflowError extends Error {
22 final message; 20 final message;
23 21
24 /// The [message] describes the underflow. 22 /// The [message] describes the underflow.
25 UnderflowError([this.message]); 23 UnderflowError([this.message]);
26 24
27 String toString() { 25 String toString() {
28 if (message != null) { 26 if (message != null) {
29 return "StreamBuffer Underflow: $message"; 27 return "StreamBuffer Underflow: $message";
30 } 28 }
31 return "StreamBuffer Underflow"; 29 return "StreamBuffer Underflow";
32 } 30 }
33 } 31 }
34 32
35 /** 33 /// Allow orderly reading of elements from a datastream, such as Socket, which
36 * Allow orderly reading of elements from a datastream, such as Socket, which 34 /// might not receive List<int> bytes regular chunks.
37 * might not receive List<int> bytes regular chunks. 35 ///
38 * 36 /// Example usage:
39 * Example usage: 37 /// StreamBuffer<int> buffer = new StreamBuffer();
40 * StreamBuffer<int> buffer = new StreamBuffer(); 38 /// Socket.connect('127.0.0.1', 5555).then((sock) => sock.pipe(buffer));
41 * Socket.connect('127.0.0.1', 5555).then((sock) => sock.pipe(buffer)); 39 /// buffer.read(100).then((bytes) {
42 * buffer.read(100).then((bytes) { 40 /// // do something with 100 bytes;
43 * // do something with 100 bytes; 41 /// });
44 * }); 42 ///
45 * 43 /// Throws [UnderflowError] if [throwOnError] is true. Useful for unexpected
46 * Throws [UnderflowError] if [throwOnError] is true. Useful for unexpected 44 /// [Socket] disconnects.
47 * [Socket] disconnects. 45 class StreamBuffer<T> implements StreamConsumer<T> {
48 */ 46 List<T> _chunks = [];
49 class StreamBuffer<T> implements StreamConsumer {
50 List _chunks = [];
51 int _offset = 0; 47 int _offset = 0;
52 int _counter = 0; // sum(_chunks[*].length) - _offset 48 int _counter = 0; // sum(_chunks[*].length) - _offset
53 List<_ReaderInWaiting<List<T>>> _readers = []; 49 List<_ReaderInWaiting<List<T>>> _readers = [];
54 StreamSubscription<T> _sub; 50 StreamSubscription<T> _sub;
55 Completer _streamDone; 51 Completer _streamDone;
56 52
57 final bool _throwOnError; 53 final bool _throwOnError;
58 54
59 Stream _currentStream; 55 Stream _currentStream;
60 56
61 int _limit = 0; 57 int _limit = 0;
62 58
63 set limit(int limit) { 59 set limit(int limit) {
64 _limit = limit; 60 _limit = limit;
65 if (_sub != null) { 61 if (_sub != null) {
66 if (!limited || _counter < limit) { 62 if (!limited || _counter < limit) {
67 _sub.resume(); 63 _sub.resume();
68 } else { 64 } else {
69 _sub.pause(); 65 _sub.pause();
70 } 66 }
71 } 67 }
72 } 68 }
73 69
74 int get limit => _limit; 70 int get limit => _limit;
75 71
76 bool get limited => _limit > 0; 72 bool get limited => _limit > 0;
77 73
78 /** 74 /// Create a stream buffer with optional, soft [limit] to the amount of data
79 * Create a stream buffer with optional, soft [limit] to the amount of data 75 /// the buffer will hold before pausing the underlying stream. A limit of 0
80 * the buffer will hold before pausing the underlying stream. A limit of 0 76 /// means no buffer limits.
81 * means no buffer limits.
82 */
83 StreamBuffer({bool throwOnError: false, int limit: 0}) 77 StreamBuffer({bool throwOnError: false, int limit: 0})
84 : this._throwOnError = throwOnError, 78 : this._throwOnError = throwOnError,
85 this._limit = limit; 79 this._limit = limit;
86 80
87 /** 81 /// The amount of unread data buffered.
88 * The amount of unread data buffered.
89 */
90 int get buffered => _counter; 82 int get buffered => _counter;
91 83
92 List<T> _consume(int size) { 84 List<T> _consume(int size) {
93 var follower = 0; 85 var follower = 0;
94 var ret = new List(size); 86 var ret = new List<T>(size);
95 var leftToRead = size; 87 var leftToRead = size;
96 while (leftToRead > 0) { 88 while (leftToRead > 0) {
97 var chunk = _chunks.first; 89 var chunk = _chunks.first;
98 var listCap = (chunk is List) ? chunk.length - _offset : 1; 90 var listCap = (chunk is List) ? chunk.length - _offset : 1;
99 var subsize = leftToRead > listCap ? listCap : leftToRead; 91 var subsize = leftToRead > listCap ? listCap : leftToRead;
100 if (chunk is List) { 92 if (chunk is List) {
101 ret.setRange(follower, follower + subsize, 93 ret.setRange(follower, follower + subsize,
102 chunk.getRange(_offset, _offset + subsize)); 94 (chunk as List<T>).getRange(_offset, _offset + subsize));
103 } else { 95 } else {
104 ret[follower] = chunk; 96 ret[follower] = chunk;
105 } 97 }
106 follower += subsize; 98 follower += subsize;
107 _offset += subsize; 99 _offset += subsize;
108 _counter -= subsize; 100 _counter -= subsize;
109 leftToRead -= subsize; 101 leftToRead -= subsize;
110 if (chunk is! List || _offset >= chunk.length) { 102 if (chunk is! List || _offset >= (chunk as List).length) {
111 _offset = 0; 103 _offset = 0;
112 _chunks.removeAt(0); 104 _chunks.removeAt(0);
113 } 105 }
114 } 106 }
115 if (limited && _sub.isPaused && _counter < limit) { 107 if (limited && _sub.isPaused && _counter < limit) {
116 _sub.resume(); 108 _sub.resume();
117 } 109 }
118 return ret; 110 return ret;
119 } 111 }
120 112
121 /** 113 /// Read fully [size] bytes from the stream and return in the future.
122 * Read fully [size] bytes from the stream and return in the future. 114 ///
123 * 115 /// Throws [ArgumentError] if size is larger than optional buffer [limit].
124 * Throws [ArgumentError] if size is larger than optional buffer [limit].
125 */
126 Future<List<T>> read(int size) { 116 Future<List<T>> read(int size) {
127 if (limited && size > limit) { 117 if (limited && size > limit) {
128 throw new ArgumentError("Cannot read $size with limit $limit"); 118 throw new ArgumentError("Cannot read $size with limit $limit");
129 } 119 }
130 120
131 // If we have enough data to consume and there are no other readers, then 121 // If we have enough data to consume and there are no other readers, then
132 // we can return immediately. 122 // we can return immediately.
133 if (size <= buffered && _readers.isEmpty) { 123 if (size <= buffered && _readers.isEmpty) {
134 return new Future.value(_consume(size)); 124 return new Future.value(_consume(size));
135 } 125 }
136 Completer completer = new Completer<List<T>>(); 126 final completer = new Completer<List<T>>();
137 _readers.add(new _ReaderInWaiting<List<T>>(size, completer)); 127 _readers.add(new _ReaderInWaiting<List<T>>(size, completer));
138 return completer.future; 128 return completer.future;
139 } 129 }
140 130
141 @override 131 @override
142 Future addStream(Stream<T> stream) { 132 Future addStream(Stream<T> stream) {
143 var lastStream = _currentStream == null ? stream : _currentStream; 133 var lastStream = _currentStream == null ? stream : _currentStream;
144 if (_sub != null) { 134 if (_sub != null) {
145 _sub.cancel(); 135 _sub.cancel();
146 _streamDone.complete(); 136 _streamDone.complete();
(...skipping 10 matching lines...) Expand all
157 while (_readers.isNotEmpty && _readers.first.size <= _counter) { 147 while (_readers.isNotEmpty && _readers.first.size <= _counter) {
158 var waiting = _readers.removeAt(0); 148 var waiting = _readers.removeAt(0);
159 waiting.completer.complete(_consume(waiting.size)); 149 waiting.completer.complete(_consume(waiting.size));
160 } 150 }
161 }, onDone: () { 151 }, onDone: () {
162 // User is piping in a new stream 152 // User is piping in a new stream
163 if (stream == lastStream && _throwOnError) { 153 if (stream == lastStream && _throwOnError) {
164 _closed(new UnderflowError()); 154 _closed(new UnderflowError());
165 } 155 }
166 streamDone.complete(); 156 streamDone.complete();
167 }, onError: (e) { 157 }, onError: (e, stack) {
168 _closed(e); 158 _closed(e, stack);
169 }); 159 });
170 return streamDone.future; 160 return streamDone.future;
171 } 161 }
172 162
173 _closed(e) { 163 void _closed(e, [StackTrace stack]) {
174 for (var reader in _readers) { 164 for (var reader in _readers) {
175 if (!reader.completer.isCompleted) { 165 if (!reader.completer.isCompleted) {
176 reader.completer.completeError(e); 166 reader.completer.completeError(e, stack);
177 } 167 }
178 } 168 }
179 _readers.clear(); 169 _readers.clear();
180 } 170 }
181 171
182 Future close() { 172 Future close() {
183 var ret; 173 var ret;
184 if (_sub != null) { 174 if (_sub != null) {
185 ret = _sub.cancel(); 175 ret = _sub.cancel();
186 _sub = null; 176 _sub = null;
187 } 177 }
188 return ret is Future ? ret : new Future.value(); 178 return ret is Future ? ret : new Future.value();
189 } 179 }
190 } 180 }
191 181
192 class _ReaderInWaiting<T> { 182 class _ReaderInWaiting<T> {
193 int size; 183 int size;
194 Completer<T> completer; 184 Completer<T> completer;
195 _ReaderInWaiting(this.size, this.completer); 185 _ReaderInWaiting(this.size, this.completer);
196 } 186 }
OLDNEW
« no previous file with comments | « packages/quiver/lib/src/async/metronome.dart ('k') | packages/quiver/lib/src/async/stream_router.dart » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698