OLD | NEW |
---|---|
(Empty) | |
1 // Copyright (c) 2012, the Dart project authors. Please see the AUTHORS file | |
2 // for details. All rights reserved. Use of this source code is governed by a | |
3 // BSD-style license that can be found in the LICENSE file. | |
4 | |
5 | |
6 /** | |
7 * A [StreamBuffer] is an unbounded buffer that is written to through the | |
8 * [OutputStream] [toBuffer] and is read from the [InputStream] [fromBuffer]. | |
9 */ | |
10 class StreamBuffer { | |
11 StreamBuffer() | |
Søren Gjesse
2012/08/10 07:58:10
I think the Dart style calls for four space indent
| |
12 : _data = [], | |
13 _closed = false, | |
14 _destroyed = false, | |
15 _available = 0, | |
16 fromBuffer = new _StreamBufferInputStream(), | |
17 toBuffer = new _StreamBufferOutputStream() { | |
18 fromBuffer._buffer = this; | |
19 toBuffer._buffer = this; | |
nweiz
2012/08/09 22:03:45
Style nit: I think this should be intended only fo
| |
20 } | |
21 | |
22 void destroy() { | |
23 _closed = true; | |
24 _closePending = true; | |
25 _data.clear(); | |
26 if (!_destroyed) { | |
27 new Timer(0, toBuffer._doOnDestroy); | |
28 } | |
29 _destroyed = true; | |
30 _available = 0; | |
31 if (fromBuffer._onDataScheduledEvent != null) { | |
32 fromBuffer._onDataScheduledEvent.cancel(); | |
33 } | |
34 if (fromBuffer._onClosedScheduledEvent != null) { | |
35 fromBuffer._onClosedScheduledEvent.cancel(); | |
nweiz
2012/08/09 22:03:45
As I mentioned in issue 4222, I don't understand w
Søren Gjesse
2012/08/10 07:58:10
Shouldn't it call onError? Destroy sounds like jus
nweiz
2012/09/04 23:50:31
That's not how InputStream#close or OutputStream#d
| |
36 } | |
37 } | |
38 | |
39 final _StreamBufferInputStream fromBuffer; | |
40 final _StreamBufferOutputStream toBuffer; | |
nweiz
2012/08/09 22:03:45
For tooling/documentation reasons, the types of th
| |
41 final List<List<int>> _data; | |
nweiz
2012/08/09 22:03:45
Would it be easier to make this a [_BufferList]? I
Søren Gjesse
2012/08/10 07:58:10
I agree _BufferList was made to fit this purpose.
| |
42 bool _closePending; | |
43 bool _closed; | |
44 bool _destroyed; | |
45 int _available; | |
46 } | |
47 | |
48 class _StreamBufferInputStream implements InputStream { | |
nweiz
2012/08/09 22:03:45
Why not extend [_BaseDataInputStream]?
| |
49 int available() => _buffer._available; | |
50 bool get closed() => _buffer._closed; | |
nweiz
2012/08/09 22:03:45
I'm a little confused how this relates to what you
Bill Hesse
2012/09/04 23:17:13
When close() is called on the output stream, _clos
nweiz
2012/09/04 23:50:31
It seems to be that (for an InputStream) exactly o
| |
51 | |
52 void set onData(void callback()) { | |
53 _onData = callback; | |
54 _scheduleCallbacks(); | |
55 } | |
56 | |
57 void set onClosed(void callback()) { | |
58 _onClosed = callback; | |
59 _scheduleCallbacks(); | |
60 } | |
61 | |
62 void set onError(void callback(e)) { | |
63 throw new StreamException( | |
64 "Error events not supported on StreamBuffer"); | |
nweiz
2012/08/09 22:03:45
It would be great if errors were supported (see is
| |
65 } | |
66 | |
67 List<int> read([int len]) { | |
68 if (closed || available() == 0) { | |
69 return const []; | |
70 } | |
71 if (len == null || len > available()) { | |
72 len = available(); | |
73 } | |
74 _buffer._available -= len; | |
75 var data = _buffer._data; | |
76 List<int> result; | |
77 if (len == data[0].length) { | |
78 result = data[0]; | |
79 data.removeRange(0, 1); | |
80 } else { | |
81 result = new List<int>(len); | |
82 int chunkIndex = 0; | |
83 int written = 0; | |
84 // Copy all the chunks except a possible partial last chunk. | |
85 while (len - written > 0 && | |
86 data[chunkIndex].length <= len - written) { | |
87 result.setRange(written, data[chunkIndex].length, data[chunkIndex]); | |
88 written += data[chunkIndex].length; | |
89 ++chunkIndex; | |
90 } | |
91 if (len - written > 0) { | |
92 // Copy the last partial chunk. | |
93 var last = data[chunkIndex]; | |
94 result.setRange(written, len - written, data[chunkIndex]); | |
95 data[chunkIndex].removeRange(0, len - written); | |
96 } | |
97 data.removeRange(0, chunkIndex); | |
98 } | |
99 _scheduleCallbacks(); | |
100 return result; | |
101 } | |
102 | |
103 int readInto(List<int> buffer, [int offset = 0, int len]) { | |
104 if (closed || available() == 0) { | |
105 return 0; | |
106 } | |
107 if (len == null) len = buffer.length - offset; | |
108 if (len > available()) { | |
109 len = available(); | |
110 } | |
111 | |
112 _buffer._available -= len; | |
113 var data = _buffer._data; | |
114 int pos = offset; | |
115 int remaining = len; | |
116 int chunkIndex = 0; | |
117 int removedChunks = 0; | |
118 while (remaining > 0) { | |
119 var chunk = data[chunkIndex]; | |
120 int bytes = chunk.length; | |
121 if (remaining >= bytes) { | |
122 buffer.setRange(pos, bytes, chunk); | |
123 pos += bytes; | |
124 remaining -= bytes; | |
125 ++removedChunks; | |
126 ++chunkIndex; | |
127 } else { | |
128 buffer.setRange(pos, remaining, chunk); | |
129 chunk.removeRange(0, remaining); | |
130 remaining = 0; | |
131 } | |
132 } | |
133 data.removeRange(0, removedChunks); | |
134 _scheduleCallbacks(); | |
135 return len; | |
136 } | |
137 | |
138 void close() { | |
139 _buffer.destroy(); | |
140 } | |
141 | |
142 void pipe(OutputStream output, [bool close]) { | |
nweiz
2012/08/09 22:03:45
Don't you get this for free from [_pipe] in stream
| |
143 // Not implemented yet. | |
144 } | |
145 | |
146 void _scheduleCallbacks() { | |
147 if (closed) return; | |
148 if (available() > 0) { | |
149 if (_onDataScheduledEvent == null) { | |
150 _onDataScheduledEvent = new Timer(0, _doOnData); | |
151 } | |
152 } else { | |
153 if (_onDataScheduledEvent != null) { | |
154 _onDataScheduledEvent.cancel(); | |
155 } | |
156 if (_buffer._closePending && _onClosedScheduledEvent == null) { | |
157 _onClosedScheduledEvent = new Timer(0, _doOnClosed); | |
158 } | |
nweiz
2012/08/09 22:03:45
Extra closing brace
Bill Hesse
2012/09/04 23:17:13
Actually, wrong indentation.
| |
159 } | |
160 } | |
161 | |
162 void _doOnData(_) { | |
163 _onDataScheduledEvent = null; | |
164 if (_onData != null) { | |
165 _onData(); | |
166 } | |
167 } | |
168 | |
169 void _doOnClosed(_) { | |
170 _buffer._closed = true; | |
171 _onClosedScheduledEvent = null; | |
172 if (_onClosed != null) { | |
173 _onClosed(); | |
174 } | |
175 } | |
176 | |
177 StreamBuffer _buffer; | |
178 Function _onData; | |
179 Function _onClosed; | |
180 Timer _onDataScheduledEvent; | |
181 Timer _onClosedScheduledEvent; | |
182 | |
183 | |
nweiz
2012/08/09 22:03:45
Remove empty lines
| |
184 } | |
185 | |
186 class _StreamBufferOutputStream implements OutputStream { | |
187 bool write(List<int> buffer, [bool copyBuffer = true]) { | |
188 if (_buffer._destroyed) { | |
189 throw new StreamException("Writing to a destroyed StreamBuffer"); | |
nweiz
2012/08/09 22:03:45
Is it useful to distinguish this error from the st
| |
190 } | |
191 if (_buffer._closePending) { | |
192 throw new StreamException.streamClosed(); | |
193 } | |
194 | |
195 if (copyBuffer) { | |
196 _buffer._data.add(new List<int>.from(buffer)); | |
197 } else { | |
198 _buffer._data.add(buffer); | |
199 } | |
200 _buffer._available += buffer.length; | |
201 _buffer.fromBuffer._scheduleCallbacks(); | |
202 return true; | |
203 } | |
204 | |
205 bool writeFrom(List<int> buffer, [int offset, int len]) { | |
206 if (offset == null) offset = 0; | |
207 if (len > buffer.length - offset) len = buffer.length - offset; | |
208 return write(buffer.getRange(offset, len), copyBuffer: false); | |
209 } | |
210 | |
211 bool writeString(String string, [Encoding encoding]) { | |
212 // Not implemented yet. | |
213 return true; | |
214 } | |
215 | |
216 void close() { | |
217 _buffer._closePending = true; | |
218 _buffer.fromBuffer._scheduleCallbacks(); | |
219 } | |
220 | |
221 void destroy() { | |
222 _buffer.destroy(); | |
223 } | |
224 | |
225 void flush() { } | |
226 | |
227 void set onClosed(void callback()) { | |
nweiz
2012/08/09 22:03:45
The behavior of this callback doesn't match the de
| |
228 _onDestroy = callback; | |
229 } | |
230 | |
231 void set onError(void callback(e)) { | |
232 throw new StreamException( | |
233 "Error events not supported on StreamBuffer"); | |
234 } | |
235 | |
236 void set onNoPendingWrites(void callback()) { | |
nweiz
2012/08/09 22:03:45
It seems like this should have some implementation
| |
237 throw new StreamException( | |
238 "onNoPendingWrites handlers not supported on StreamBuffer"); | |
239 } | |
240 | |
241 void _doOnDestroy(_) { | |
242 if (_onDestroy != null) { | |
243 _onDestroy(); | |
244 } | |
245 } | |
246 | |
247 Function _onDestroy; | |
248 StreamBuffer _buffer; | |
249 } | |
nweiz
2012/08/09 22:03:45
Now that my [OutputStream.closed] CL is submitted,
| |
OLD | NEW |