Index: quiver/lib/src/streams/streambuffer.dart |
diff --git a/quiver/lib/src/streams/streambuffer.dart b/quiver/lib/src/streams/streambuffer.dart |
deleted file mode 100644 |
index bca610810268a08e0e7072e9d1a8f89b4773d927..0000000000000000000000000000000000000000 |
--- a/quiver/lib/src/streams/streambuffer.dart |
+++ /dev/null |
@@ -1,196 +0,0 @@ |
-// Copyright 2014 Google Inc. All Rights Reserved. |
-// |
-// Licensed under the Apache License, Version 2.0 (the "License"); |
-// you may not use this file except in compliance with the License. |
-// You may obtain a copy of the License at |
-// |
-// http://www.apache.org/licenses/LICENSE-2.0 |
-// |
-// Unless required by applicable law or agreed to in writing, software |
-// distributed under the License is distributed on an "AS IS" BASIS, |
-// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
-// See the License for the specific language governing permissions and |
-// limitations under the License. |
- |
-part of quiver.streams; |
- |
-/** |
- * Underflow errors happen when the socket feeding a buffer is finished while |
- * there are still blocked readers. Each reader will complete with this error. |
- */ |
-class UnderflowError extends Error { |
- final message; |
- |
- /// The [message] describes the underflow. |
- UnderflowError([this.message]); |
- |
- String toString() { |
- if (message != null) { |
- return "StreamBuffer Underflow: $message"; |
- } |
- return "StreamBuffer Underflow"; |
- } |
-} |
- |
-/** |
- * Allow orderly reading of elements from a datastream, such as Socket, which |
- * might not receive List<int> bytes regular chunks. |
- * |
- * Example usage: |
- * StreamBuffer<int> buffer = new StreamBuffer(); |
- * Socket.connect('127.0.0.1', 5555).then((sock) => sock.pipe(buffer)); |
- * buffer.read(100).then((bytes) { |
- * // do something with 100 bytes; |
- * }); |
- * |
- * Throws [UnderflowError] if [throwOnError] is true. Useful for unexpected |
- * [Socket] disconnects. |
- */ |
-class StreamBuffer<T> implements StreamConsumer { |
- List _chunks = []; |
- int _offset = 0; |
- int _counter = 0; // sum(_chunks[*].length) - _offset |
- List<_ReaderInWaiting<List<T>>> _readers = []; |
- StreamSubscription<T> _sub; |
- Completer _streamDone; |
- |
- final bool _throwOnError; |
- |
- Stream _currentStream; |
- |
- int _limit = 0; |
- |
- set limit(int limit) { |
- _limit = limit; |
- if (_sub != null) { |
- if (!limited || _counter < limit) { |
- _sub.resume(); |
- } else { |
- _sub.pause(); |
- } |
- } |
- } |
- |
- int get limit => _limit; |
- |
- bool get limited => _limit > 0; |
- |
- /** |
- * Create a stream buffer with optional, soft [limit] to the amount of data |
- * the buffer will hold before pausing the underlying stream. A limit of 0 |
- * means no buffer limits. |
- */ |
- StreamBuffer({bool throwOnError: false, int limit: 0}) |
- : this._throwOnError = throwOnError, |
- this._limit = limit; |
- |
- /** |
- * The amount of unread data buffered. |
- */ |
- int get buffered => _counter; |
- |
- List<T> _consume(int size) { |
- var follower = 0; |
- var ret = new List(size); |
- var leftToRead = size; |
- while (leftToRead > 0) { |
- var chunk = _chunks.first; |
- var listCap = (chunk is List) ? chunk.length - _offset : 1; |
- var subsize = leftToRead > listCap ? listCap : leftToRead; |
- if (chunk is List) { |
- ret.setRange(follower, follower + subsize, |
- chunk.getRange(_offset, _offset + subsize)); |
- } else { |
- ret[follower] = chunk; |
- } |
- follower += subsize; |
- _offset += subsize; |
- _counter -= subsize; |
- leftToRead -= subsize; |
- if (chunk is! List || _offset >= chunk.length) { |
- _offset = 0; |
- _chunks.removeAt(0); |
- } |
- } |
- if (limited && _sub.isPaused && _counter < limit) { |
- _sub.resume(); |
- } |
- return ret; |
- } |
- |
- /** |
- * Read fully [size] bytes from the stream and return in the future. |
- * |
- * Throws [ArgumentError] if size is larger than optional buffer [limit]. |
- */ |
- Future<List<T>> read(int size) { |
- if (limited && size > limit) { |
- throw new ArgumentError("Cannot read $size with limit $limit"); |
- } |
- |
- // If we have enough data to consume and there are no other readers, then |
- // we can return immediately. |
- if (size <= buffered && _readers.isEmpty) { |
- return new Future.value(_consume(size)); |
- } |
- Completer completer = new Completer<List<T>>(); |
- _readers.add(new _ReaderInWaiting<List<T>>(size, completer)); |
- return completer.future; |
- } |
- |
- @override |
- Future addStream(Stream<T> stream) { |
- var lastStream = _currentStream == null ? stream : _currentStream; |
- if (_sub != null) { |
- _sub.cancel(); |
- _streamDone.complete(); |
- } |
- _currentStream = stream; |
- Completer streamDone = new Completer(); |
- _sub = stream.listen((items) { |
- _chunks.add(items); |
- _counter += items is List ? items.length : 1; |
- if (limited && _counter >= limit) { |
- _sub.pause(); |
- } |
- |
- while (_readers.isNotEmpty && _readers.first.size <= _counter) { |
- var waiting = _readers.removeAt(0); |
- waiting.completer.complete(_consume(waiting.size)); |
- } |
- }, onDone: () { |
- // User is piping in a new stream |
- if (stream == lastStream && _throwOnError) { |
- _closed(new UnderflowError()); |
- } |
- streamDone.complete(); |
- }, onError: (e) { |
- _closed(e); |
- }); |
- return streamDone.future; |
- } |
- |
- _closed(e) { |
- for (var reader in _readers) { |
- if (!reader.completer.isCompleted) { |
- reader.completer.completeError(e); |
- } |
- } |
- _readers.clear(); |
- } |
- |
- Future close() { |
- var ret; |
- if (_sub != null) { |
- ret = _sub.cancel(); |
- _sub = null; |
- } |
- return ret is Future ? ret : new Future.value(); |
- } |
-} |
- |
-class _ReaderInWaiting<T> { |
- int size; |
- Completer<T> completer; |
- _ReaderInWaiting(this.size, this.completer); |
-} |