Chromium Code Reviews| Index: runtime/bin/stream_buffer.dart |
| diff --git a/runtime/bin/stream_buffer.dart b/runtime/bin/stream_buffer.dart |
| new file mode 100644 |
| index 0000000000000000000000000000000000000000..5df701c93c2abceffa14b867fc842095e2f97ae1 |
| --- /dev/null |
| +++ b/runtime/bin/stream_buffer.dart |
| @@ -0,0 +1,249 @@ |
| +// Copyright (c) 2012, the Dart project authors. Please see the AUTHORS file |
| +// for details. All rights reserved. Use of this source code is governed by a |
| +// BSD-style license that can be found in the LICENSE file. |
| + |
| + |
| +/** |
| + * A [StreamBuffer] is an unbounded buffer that is written to through the |
| + * [OutputStream] [toBuffer] and is read from the [InputStream] [fromBuffer]. |
| + */ |
| +class StreamBuffer { |
| + StreamBuffer() |
|
Søren Gjesse
2012/08/10 07:58:10
I think the Dart style calls for four space indent
|
| + : _data = [], |
| + _closed = false, |
| + _destroyed = false, |
| + _available = 0, |
| + fromBuffer = new _StreamBufferInputStream(), |
| + toBuffer = new _StreamBufferOutputStream() { |
| + fromBuffer._buffer = this; |
| + toBuffer._buffer = this; |
|
nweiz
2012/08/09 22:03:45
Style nit: I think this should be intended only fo
|
| + } |
| + |
| + void destroy() { |
| + _closed = true; |
| + _closePending = true; |
| + _data.clear(); |
| + if (!_destroyed) { |
| + new Timer(0, toBuffer._doOnDestroy); |
| + } |
| + _destroyed = true; |
| + _available = 0; |
| + if (fromBuffer._onDataScheduledEvent != null) { |
| + fromBuffer._onDataScheduledEvent.cancel(); |
| + } |
| + if (fromBuffer._onClosedScheduledEvent != null) { |
| + fromBuffer._onClosedScheduledEvent.cancel(); |
|
nweiz
2012/08/09 22:03:45
As I mentioned in issue 4222, I don't understand w
Søren Gjesse
2012/08/10 07:58:10
Shouldn't it call onError? Destroy sounds like jus
nweiz
2012/09/04 23:50:31
That's not how InputStream#close or OutputStream#d
|
| + } |
| + } |
| + |
| + final _StreamBufferInputStream fromBuffer; |
| + final _StreamBufferOutputStream toBuffer; |
|
nweiz
2012/08/09 22:03:45
For tooling/documentation reasons, the types of th
|
| + final List<List<int>> _data; |
|
nweiz
2012/08/09 22:03:45
Would it be easier to make this a [_BufferList]? I
Søren Gjesse
2012/08/10 07:58:10
I agree _BufferList was made to fit this purpose.
|
| + bool _closePending; |
| + bool _closed; |
| + bool _destroyed; |
| + int _available; |
| +} |
| + |
| +class _StreamBufferInputStream implements InputStream { |
|
nweiz
2012/08/09 22:03:45
Why not extend [_BaseDataInputStream]?
|
| + int available() => _buffer._available; |
| + bool get closed() => _buffer._closed; |
|
nweiz
2012/08/09 22:03:45
I'm a little confused how this relates to what you
Bill Hesse
2012/09/04 23:17:13
When close() is called on the output stream, _clos
nweiz
2012/09/04 23:50:31
It seems to be that (for an InputStream) exactly o
|
| + |
| + void set onData(void callback()) { |
| + _onData = callback; |
| + _scheduleCallbacks(); |
| + } |
| + |
| + void set onClosed(void callback()) { |
| + _onClosed = callback; |
| + _scheduleCallbacks(); |
| + } |
| + |
| + void set onError(void callback(e)) { |
| + throw new StreamException( |
| + "Error events not supported on StreamBuffer"); |
|
nweiz
2012/08/09 22:03:45
It would be great if errors were supported (see is
|
| + } |
| + |
| + List<int> read([int len]) { |
| + if (closed || available() == 0) { |
| + return const []; |
| + } |
| + if (len == null || len > available()) { |
| + len = available(); |
| + } |
| + _buffer._available -= len; |
| + var data = _buffer._data; |
| + List<int> result; |
| + if (len == data[0].length) { |
| + result = data[0]; |
| + data.removeRange(0, 1); |
| + } else { |
| + result = new List<int>(len); |
| + int chunkIndex = 0; |
| + int written = 0; |
| + // Copy all the chunks except a possible partial last chunk. |
| + while (len - written > 0 && |
| + data[chunkIndex].length <= len - written) { |
| + result.setRange(written, data[chunkIndex].length, data[chunkIndex]); |
| + written += data[chunkIndex].length; |
| + ++chunkIndex; |
| + } |
| + if (len - written > 0) { |
| + // Copy the last partial chunk. |
| + var last = data[chunkIndex]; |
| + result.setRange(written, len - written, data[chunkIndex]); |
| + data[chunkIndex].removeRange(0, len - written); |
| + } |
| + data.removeRange(0, chunkIndex); |
| + } |
| + _scheduleCallbacks(); |
| + return result; |
| + } |
| + |
| + int readInto(List<int> buffer, [int offset = 0, int len]) { |
| + if (closed || available() == 0) { |
| + return 0; |
| + } |
| + if (len == null) len = buffer.length - offset; |
| + if (len > available()) { |
| + len = available(); |
| + } |
| + |
| + _buffer._available -= len; |
| + var data = _buffer._data; |
| + int pos = offset; |
| + int remaining = len; |
| + int chunkIndex = 0; |
| + int removedChunks = 0; |
| + while (remaining > 0) { |
| + var chunk = data[chunkIndex]; |
| + int bytes = chunk.length; |
| + if (remaining >= bytes) { |
| + buffer.setRange(pos, bytes, chunk); |
| + pos += bytes; |
| + remaining -= bytes; |
| + ++removedChunks; |
| + ++chunkIndex; |
| + } else { |
| + buffer.setRange(pos, remaining, chunk); |
| + chunk.removeRange(0, remaining); |
| + remaining = 0; |
| + } |
| + } |
| + data.removeRange(0, removedChunks); |
| + _scheduleCallbacks(); |
| + return len; |
| + } |
| + |
| + void close() { |
| + _buffer.destroy(); |
| + } |
| + |
| + void pipe(OutputStream output, [bool close]) { |
|
nweiz
2012/08/09 22:03:45
Don't you get this for free from [_pipe] in stream
|
| + // Not implemented yet. |
| + } |
| + |
| + void _scheduleCallbacks() { |
| + if (closed) return; |
| + if (available() > 0) { |
| + if (_onDataScheduledEvent == null) { |
| + _onDataScheduledEvent = new Timer(0, _doOnData); |
| + } |
| + } else { |
| + if (_onDataScheduledEvent != null) { |
| + _onDataScheduledEvent.cancel(); |
| + } |
| + if (_buffer._closePending && _onClosedScheduledEvent == null) { |
| + _onClosedScheduledEvent = new Timer(0, _doOnClosed); |
| + } |
|
nweiz
2012/08/09 22:03:45
Extra closing brace
Bill Hesse
2012/09/04 23:17:13
Actually, wrong indentation.
|
| + } |
| + } |
| + |
| + void _doOnData(_) { |
| + _onDataScheduledEvent = null; |
| + if (_onData != null) { |
| + _onData(); |
| + } |
| + } |
| + |
| + void _doOnClosed(_) { |
| + _buffer._closed = true; |
| + _onClosedScheduledEvent = null; |
| + if (_onClosed != null) { |
| + _onClosed(); |
| + } |
| + } |
| + |
| + StreamBuffer _buffer; |
| + Function _onData; |
| + Function _onClosed; |
| + Timer _onDataScheduledEvent; |
| + Timer _onClosedScheduledEvent; |
| + |
| + |
|
nweiz
2012/08/09 22:03:45
Remove empty lines
|
| +} |
| + |
| +class _StreamBufferOutputStream implements OutputStream { |
| + bool write(List<int> buffer, [bool copyBuffer = true]) { |
| + if (_buffer._destroyed) { |
| + throw new StreamException("Writing to a destroyed StreamBuffer"); |
|
nweiz
2012/08/09 22:03:45
Is it useful to distinguish this error from the st
|
| + } |
| + if (_buffer._closePending) { |
| + throw new StreamException.streamClosed(); |
| + } |
| + |
| + if (copyBuffer) { |
| + _buffer._data.add(new List<int>.from(buffer)); |
| + } else { |
| + _buffer._data.add(buffer); |
| + } |
| + _buffer._available += buffer.length; |
| + _buffer.fromBuffer._scheduleCallbacks(); |
| + return true; |
| + } |
| + |
| + bool writeFrom(List<int> buffer, [int offset, int len]) { |
| + if (offset == null) offset = 0; |
| + if (len > buffer.length - offset) len = buffer.length - offset; |
| + return write(buffer.getRange(offset, len), copyBuffer: false); |
| + } |
| + |
| + bool writeString(String string, [Encoding encoding]) { |
| + // Not implemented yet. |
| + return true; |
| + } |
| + |
| + void close() { |
| + _buffer._closePending = true; |
| + _buffer.fromBuffer._scheduleCallbacks(); |
| + } |
| + |
| + void destroy() { |
| + _buffer.destroy(); |
| + } |
| + |
| + void flush() { } |
| + |
| + void set onClosed(void callback()) { |
|
nweiz
2012/08/09 22:03:45
The behavior of this callback doesn't match the de
|
| + _onDestroy = callback; |
| + } |
| + |
| + void set onError(void callback(e)) { |
| + throw new StreamException( |
| + "Error events not supported on StreamBuffer"); |
| + } |
| + |
| + void set onNoPendingWrites(void callback()) { |
|
nweiz
2012/08/09 22:03:45
It seems like this should have some implementation
|
| + throw new StreamException( |
| + "onNoPendingWrites handlers not supported on StreamBuffer"); |
| + } |
| + |
| + void _doOnDestroy(_) { |
| + if (_onDestroy != null) { |
| + _onDestroy(); |
| + } |
| + } |
| + |
| + Function _onDestroy; |
| + StreamBuffer _buffer; |
| +} |
|
nweiz
2012/08/09 22:03:45
Now that my [OutputStream.closed] CL is submitted,
|