| Index: quiver/lib/src/streams/streambuffer.dart
|
| diff --git a/quiver/lib/src/streams/streambuffer.dart b/quiver/lib/src/streams/streambuffer.dart
|
| deleted file mode 100644
|
| index bca610810268a08e0e7072e9d1a8f89b4773d927..0000000000000000000000000000000000000000
|
| --- a/quiver/lib/src/streams/streambuffer.dart
|
| +++ /dev/null
|
| @@ -1,196 +0,0 @@
|
| -// Copyright 2014 Google Inc. All Rights Reserved.
|
| -//
|
| -// Licensed under the Apache License, Version 2.0 (the "License");
|
| -// you may not use this file except in compliance with the License.
|
| -// You may obtain a copy of the License at
|
| -//
|
| -// http://www.apache.org/licenses/LICENSE-2.0
|
| -//
|
| -// Unless required by applicable law or agreed to in writing, software
|
| -// distributed under the License is distributed on an "AS IS" BASIS,
|
| -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
| -// See the License for the specific language governing permissions and
|
| -// limitations under the License.
|
| -
|
| -part of quiver.streams;
|
| -
|
| -/**
|
| - * Underflow errors happen when the socket feeding a buffer is finished while
|
| - * there are still blocked readers. Each reader will complete with this error.
|
| - */
|
| -class UnderflowError extends Error {
|
| - final message;
|
| -
|
| - /// The [message] describes the underflow.
|
| - UnderflowError([this.message]);
|
| -
|
| - String toString() {
|
| - if (message != null) {
|
| - return "StreamBuffer Underflow: $message";
|
| - }
|
| - return "StreamBuffer Underflow";
|
| - }
|
| -}
|
| -
|
| -/**
|
| - * Allow orderly reading of elements from a datastream, such as Socket, which
|
| - * might not receive List<int> bytes regular chunks.
|
| - *
|
| - * Example usage:
|
| - * StreamBuffer<int> buffer = new StreamBuffer();
|
| - * Socket.connect('127.0.0.1', 5555).then((sock) => sock.pipe(buffer));
|
| - * buffer.read(100).then((bytes) {
|
| - * // do something with 100 bytes;
|
| - * });
|
| - *
|
| - * Throws [UnderflowError] if [throwOnError] is true. Useful for unexpected
|
| - * [Socket] disconnects.
|
| - */
|
| -class StreamBuffer<T> implements StreamConsumer {
|
| - List _chunks = [];
|
| - int _offset = 0;
|
| - int _counter = 0; // sum(_chunks[*].length) - _offset
|
| - List<_ReaderInWaiting<List<T>>> _readers = [];
|
| - StreamSubscription<T> _sub;
|
| - Completer _streamDone;
|
| -
|
| - final bool _throwOnError;
|
| -
|
| - Stream _currentStream;
|
| -
|
| - int _limit = 0;
|
| -
|
| - set limit(int limit) {
|
| - _limit = limit;
|
| - if (_sub != null) {
|
| - if (!limited || _counter < limit) {
|
| - _sub.resume();
|
| - } else {
|
| - _sub.pause();
|
| - }
|
| - }
|
| - }
|
| -
|
| - int get limit => _limit;
|
| -
|
| - bool get limited => _limit > 0;
|
| -
|
| - /**
|
| - * Create a stream buffer with optional, soft [limit] to the amount of data
|
| - * the buffer will hold before pausing the underlying stream. A limit of 0
|
| - * means no buffer limits.
|
| - */
|
| - StreamBuffer({bool throwOnError: false, int limit: 0})
|
| - : this._throwOnError = throwOnError,
|
| - this._limit = limit;
|
| -
|
| - /**
|
| - * The amount of unread data buffered.
|
| - */
|
| - int get buffered => _counter;
|
| -
|
| - List<T> _consume(int size) {
|
| - var follower = 0;
|
| - var ret = new List(size);
|
| - var leftToRead = size;
|
| - while (leftToRead > 0) {
|
| - var chunk = _chunks.first;
|
| - var listCap = (chunk is List) ? chunk.length - _offset : 1;
|
| - var subsize = leftToRead > listCap ? listCap : leftToRead;
|
| - if (chunk is List) {
|
| - ret.setRange(follower, follower + subsize,
|
| - chunk.getRange(_offset, _offset + subsize));
|
| - } else {
|
| - ret[follower] = chunk;
|
| - }
|
| - follower += subsize;
|
| - _offset += subsize;
|
| - _counter -= subsize;
|
| - leftToRead -= subsize;
|
| - if (chunk is! List || _offset >= chunk.length) {
|
| - _offset = 0;
|
| - _chunks.removeAt(0);
|
| - }
|
| - }
|
| - if (limited && _sub.isPaused && _counter < limit) {
|
| - _sub.resume();
|
| - }
|
| - return ret;
|
| - }
|
| -
|
| - /**
|
| - * Read fully [size] bytes from the stream and return in the future.
|
| - *
|
| - * Throws [ArgumentError] if size is larger than optional buffer [limit].
|
| - */
|
| - Future<List<T>> read(int size) {
|
| - if (limited && size > limit) {
|
| - throw new ArgumentError("Cannot read $size with limit $limit");
|
| - }
|
| -
|
| - // If we have enough data to consume and there are no other readers, then
|
| - // we can return immediately.
|
| - if (size <= buffered && _readers.isEmpty) {
|
| - return new Future.value(_consume(size));
|
| - }
|
| - Completer completer = new Completer<List<T>>();
|
| - _readers.add(new _ReaderInWaiting<List<T>>(size, completer));
|
| - return completer.future;
|
| - }
|
| -
|
| - @override
|
| - Future addStream(Stream<T> stream) {
|
| - var lastStream = _currentStream == null ? stream : _currentStream;
|
| - if (_sub != null) {
|
| - _sub.cancel();
|
| - _streamDone.complete();
|
| - }
|
| - _currentStream = stream;
|
| - Completer streamDone = new Completer();
|
| - _sub = stream.listen((items) {
|
| - _chunks.add(items);
|
| - _counter += items is List ? items.length : 1;
|
| - if (limited && _counter >= limit) {
|
| - _sub.pause();
|
| - }
|
| -
|
| - while (_readers.isNotEmpty && _readers.first.size <= _counter) {
|
| - var waiting = _readers.removeAt(0);
|
| - waiting.completer.complete(_consume(waiting.size));
|
| - }
|
| - }, onDone: () {
|
| - // User is piping in a new stream
|
| - if (stream == lastStream && _throwOnError) {
|
| - _closed(new UnderflowError());
|
| - }
|
| - streamDone.complete();
|
| - }, onError: (e) {
|
| - _closed(e);
|
| - });
|
| - return streamDone.future;
|
| - }
|
| -
|
| - _closed(e) {
|
| - for (var reader in _readers) {
|
| - if (!reader.completer.isCompleted) {
|
| - reader.completer.completeError(e);
|
| - }
|
| - }
|
| - _readers.clear();
|
| - }
|
| -
|
| - Future close() {
|
| - var ret;
|
| - if (_sub != null) {
|
| - ret = _sub.cancel();
|
| - _sub = null;
|
| - }
|
| - return ret is Future ? ret : new Future.value();
|
| - }
|
| -}
|
| -
|
| -class _ReaderInWaiting<T> {
|
| - int size;
|
| - Completer<T> completer;
|
| - _ReaderInWaiting(this.size, this.completer);
|
| -}
|
|
|