Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(126)

Side by Side Diff: packages/quiver/lib/src/streams/streambuffer.dart

Issue 2989763002: Update charted to 0.4.8 and roll (Closed)
Patch Set: Removed Cutch from list of reviewers Created 3 years, 4 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
« no previous file with comments | « packages/quiver/lib/src/streams/enumerate.dart ('k') | packages/quiver/lib/src/time/clock.dart » ('j') | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
(Empty)
1 // Copyright 2014 Google Inc. All Rights Reserved.
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 // http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14
15 part of quiver.streams;
16
17 /**
18 * Underflow errors happen when the socket feeding a buffer is finished while
19 * there are still blocked readers. Each reader will complete with this error.
20 */
21 class UnderflowError extends Error {
22 final message;
23
24 /// The [message] describes the underflow.
25 UnderflowError([this.message]);
26
27 String toString() {
28 if (message != null) {
29 return "StreamBuffer Underflow: $message";
30 }
31 return "StreamBuffer Underflow";
32 }
33 }
34
35 /**
36 * Allow orderly reading of elements from a datastream, such as Socket, which
37 * might not receive List<int> bytes regular chunks.
38 *
39 * Example usage:
40 * StreamBuffer<int> buffer = new StreamBuffer();
41 * Socket.connect('127.0.0.1', 5555).then((sock) => sock.pipe(buffer));
42 * buffer.read(100).then((bytes) {
43 * // do something with 100 bytes;
44 * });
45 *
46 * Throws [UnderflowError] if [throwOnError] is true. Useful for unexpected
47 * [Socket] disconnects.
48 */
49 class StreamBuffer<T> implements StreamConsumer {
50 List _chunks = [];
51 int _offset = 0;
52 int _counter = 0; // sum(_chunks[*].length) - _offset
53 List<_ReaderInWaiting<List<T>>> _readers = [];
54 StreamSubscription<T> _sub;
55 Completer _streamDone;
56
57 final bool _throwOnError;
58
59 Stream _currentStream;
60
61 int _limit = 0;
62
63 set limit(int limit) {
64 _limit = limit;
65 if (_sub != null) {
66 if (!limited || _counter < limit) {
67 _sub.resume();
68 } else {
69 _sub.pause();
70 }
71 }
72 }
73
74 int get limit => _limit;
75
76 bool get limited => _limit > 0;
77
78 /**
79 * Create a stream buffer with optional, soft [limit] to the amount of data
80 * the buffer will hold before pausing the underlying stream. A limit of 0
81 * means no buffer limits.
82 */
83 StreamBuffer({bool throwOnError: false, int limit: 0})
84 : this._throwOnError = throwOnError,
85 this._limit = limit;
86
87 /**
88 * The amount of unread data buffered.
89 */
90 int get buffered => _counter;
91
92 List<T> _consume(int size) {
93 var follower = 0;
94 var ret = new List(size);
95 var leftToRead = size;
96 while (leftToRead > 0) {
97 var chunk = _chunks.first;
98 var listCap = (chunk is List) ? chunk.length - _offset : 1;
99 var subsize = leftToRead > listCap ? listCap : leftToRead;
100 if (chunk is List) {
101 ret.setRange(follower, follower + subsize,
102 chunk.getRange(_offset, _offset + subsize));
103 } else {
104 ret[follower] = chunk;
105 }
106 follower += subsize;
107 _offset += subsize;
108 _counter -= subsize;
109 leftToRead -= subsize;
110 if (chunk is! List || _offset >= chunk.length) {
111 _offset = 0;
112 _chunks.removeAt(0);
113 }
114 }
115 if (limited && _sub.isPaused && _counter < limit) {
116 _sub.resume();
117 }
118 return ret;
119 }
120
121 /**
122 * Read fully [size] bytes from the stream and return in the future.
123 *
124 * Throws [ArgumentError] if size is larger than optional buffer [limit].
125 */
126 Future<List<T>> read(int size) {
127 if (limited && size > limit) {
128 throw new ArgumentError("Cannot read $size with limit $limit");
129 }
130
131 // If we have enough data to consume and there are no other readers, then
132 // we can return immediately.
133 if (size <= buffered && _readers.isEmpty) {
134 return new Future.value(_consume(size));
135 }
136 Completer completer = new Completer<List<T>>();
137 _readers.add(new _ReaderInWaiting<List<T>>(size, completer));
138 return completer.future;
139 }
140
141 @override
142 Future addStream(Stream<T> stream) {
143 var lastStream = _currentStream == null ? stream : _currentStream;
144 if (_sub != null) {
145 _sub.cancel();
146 _streamDone.complete();
147 }
148 _currentStream = stream;
149 Completer streamDone = new Completer();
150 _sub = stream.listen((items) {
151 _chunks.add(items);
152 _counter += items is List ? items.length : 1;
153 if (limited && _counter >= limit) {
154 _sub.pause();
155 }
156
157 while (_readers.isNotEmpty && _readers.first.size <= _counter) {
158 var waiting = _readers.removeAt(0);
159 waiting.completer.complete(_consume(waiting.size));
160 }
161 }, onDone: () {
162 // User is piping in a new stream
163 if (stream == lastStream && _throwOnError) {
164 _closed(new UnderflowError());
165 }
166 streamDone.complete();
167 }, onError: (e) {
168 _closed(e);
169 });
170 return streamDone.future;
171 }
172
173 _closed(e) {
174 for (var reader in _readers) {
175 if (!reader.completer.isCompleted) {
176 reader.completer.completeError(e);
177 }
178 }
179 _readers.clear();
180 }
181
182 Future close() {
183 var ret;
184 if (_sub != null) {
185 ret = _sub.cancel();
186 _sub = null;
187 }
188 return ret is Future ? ret : new Future.value();
189 }
190 }
191
192 class _ReaderInWaiting<T> {
193 int size;
194 Completer<T> completer;
195 _ReaderInWaiting(this.size, this.completer);
196 }
OLDNEW
« no previous file with comments | « packages/quiver/lib/src/streams/enumerate.dart ('k') | packages/quiver/lib/src/time/clock.dart » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698