| Index: packages/quiver/lib/src/async/stream_buffer.dart
|
| diff --git a/packages/quiver/lib/src/streams/streambuffer.dart b/packages/quiver/lib/src/async/stream_buffer.dart
|
| similarity index 72%
|
| rename from packages/quiver/lib/src/streams/streambuffer.dart
|
| rename to packages/quiver/lib/src/async/stream_buffer.dart
|
| index bca610810268a08e0e7072e9d1a8f89b4773d927..1943d3e4daae619bc77bc071f29957450c74a2ac 100644
|
| --- a/packages/quiver/lib/src/streams/streambuffer.dart
|
| +++ b/packages/quiver/lib/src/async/stream_buffer.dart
|
| @@ -12,12 +12,10 @@
|
| // See the License for the specific language governing permissions and
|
| // limitations under the License.
|
|
|
| -part of quiver.streams;
|
| +part of quiver.async;
|
|
|
| -/**
|
| - * Underflow errors happen when the socket feeding a buffer is finished while
|
| - * there are still blocked readers. Each reader will complete with this error.
|
| - */
|
| +/// Underflow errors happen when the socket feeding a buffer is finished while
|
| +/// there are still blocked readers. Each reader will complete with this error.
|
| class UnderflowError extends Error {
|
| final message;
|
|
|
| @@ -32,22 +30,20 @@ class UnderflowError extends Error {
|
| }
|
| }
|
|
|
| -/**
|
| - * Allow orderly reading of elements from a datastream, such as Socket, which
|
| - * might not receive List<int> bytes regular chunks.
|
| - *
|
| - * Example usage:
|
| - * StreamBuffer<int> buffer = new StreamBuffer();
|
| - * Socket.connect('127.0.0.1', 5555).then((sock) => sock.pipe(buffer));
|
| - * buffer.read(100).then((bytes) {
|
| - * // do something with 100 bytes;
|
| - * });
|
| - *
|
| - * Throws [UnderflowError] if [throwOnError] is true. Useful for unexpected
|
| - * [Socket] disconnects.
|
| - */
|
| -class StreamBuffer<T> implements StreamConsumer {
|
| - List _chunks = [];
|
| +/// Allow orderly reading of elements from a datastream, such as Socket, which
|
| +/// might not receive List<int> bytes regular chunks.
|
| +///
|
| +/// Example usage:
|
| +/// StreamBuffer<int> buffer = new StreamBuffer();
|
| +/// Socket.connect('127.0.0.1', 5555).then((sock) => sock.pipe(buffer));
|
| +/// buffer.read(100).then((bytes) {
|
| +/// // do something with 100 bytes;
|
| +/// });
|
| +///
|
| +/// Throws [UnderflowError] if [throwOnError] is true. Useful for unexpected
|
| +/// [Socket] disconnects.
|
| +class StreamBuffer<T> implements StreamConsumer<T> {
|
| + List<T> _chunks = [];
|
| int _offset = 0;
|
| int _counter = 0; // sum(_chunks[*].length) - _offset
|
| List<_ReaderInWaiting<List<T>>> _readers = [];
|
| @@ -75,23 +71,19 @@ class StreamBuffer<T> implements StreamConsumer {
|
|
|
| bool get limited => _limit > 0;
|
|
|
| - /**
|
| - * Create a stream buffer with optional, soft [limit] to the amount of data
|
| - * the buffer will hold before pausing the underlying stream. A limit of 0
|
| - * means no buffer limits.
|
| - */
|
| + /// Create a stream buffer with optional, soft [limit] to the amount of data
|
| + /// the buffer will hold before pausing the underlying stream. A limit of 0
|
| + /// means no buffer limits.
|
| StreamBuffer({bool throwOnError: false, int limit: 0})
|
| : this._throwOnError = throwOnError,
|
| this._limit = limit;
|
|
|
| - /**
|
| - * The amount of unread data buffered.
|
| - */
|
| + /// The amount of unread data buffered.
|
| int get buffered => _counter;
|
|
|
| List<T> _consume(int size) {
|
| var follower = 0;
|
| - var ret = new List(size);
|
| + var ret = new List<T>(size);
|
| var leftToRead = size;
|
| while (leftToRead > 0) {
|
| var chunk = _chunks.first;
|
| @@ -99,7 +91,7 @@ class StreamBuffer<T> implements StreamConsumer {
|
| var subsize = leftToRead > listCap ? listCap : leftToRead;
|
| if (chunk is List) {
|
| ret.setRange(follower, follower + subsize,
|
| - chunk.getRange(_offset, _offset + subsize));
|
| + (chunk as List<T>).getRange(_offset, _offset + subsize));
|
| } else {
|
| ret[follower] = chunk;
|
| }
|
| @@ -107,7 +99,7 @@ class StreamBuffer<T> implements StreamConsumer {
|
| _offset += subsize;
|
| _counter -= subsize;
|
| leftToRead -= subsize;
|
| - if (chunk is! List || _offset >= chunk.length) {
|
| + if (chunk is! List || _offset >= (chunk as List).length) {
|
| _offset = 0;
|
| _chunks.removeAt(0);
|
| }
|
| @@ -118,11 +110,9 @@ class StreamBuffer<T> implements StreamConsumer {
|
| return ret;
|
| }
|
|
|
| - /**
|
| - * Read fully [size] bytes from the stream and return in the future.
|
| - *
|
| - * Throws [ArgumentError] if size is larger than optional buffer [limit].
|
| - */
|
| + /// Read fully [size] bytes from the stream and return in the future.
|
| + ///
|
| + /// Throws [ArgumentError] if size is larger than optional buffer [limit].
|
| Future<List<T>> read(int size) {
|
| if (limited && size > limit) {
|
| throw new ArgumentError("Cannot read $size with limit $limit");
|
| @@ -133,7 +123,7 @@ class StreamBuffer<T> implements StreamConsumer {
|
| if (size <= buffered && _readers.isEmpty) {
|
| return new Future.value(_consume(size));
|
| }
|
| - Completer completer = new Completer<List<T>>();
|
| + final completer = new Completer<List<T>>();
|
| _readers.add(new _ReaderInWaiting<List<T>>(size, completer));
|
| return completer.future;
|
| }
|
| @@ -164,16 +154,16 @@ class StreamBuffer<T> implements StreamConsumer {
|
| _closed(new UnderflowError());
|
| }
|
| streamDone.complete();
|
| - }, onError: (e) {
|
| - _closed(e);
|
| + }, onError: (e, stack) {
|
| + _closed(e, stack);
|
| });
|
| return streamDone.future;
|
| }
|
|
|
| - _closed(e) {
|
| + void _closed(e, [StackTrace stack]) {
|
| for (var reader in _readers) {
|
| if (!reader.completer.isCompleted) {
|
| - reader.completer.completeError(e);
|
| + reader.completer.completeError(e, stack);
|
| }
|
| }
|
| _readers.clear();
|
|
|