Index: packages/quiver/lib/src/async/stream_buffer.dart |
diff --git a/packages/quiver/lib/src/streams/streambuffer.dart b/packages/quiver/lib/src/async/stream_buffer.dart |
similarity index 72% |
rename from packages/quiver/lib/src/streams/streambuffer.dart |
rename to packages/quiver/lib/src/async/stream_buffer.dart |
index bca610810268a08e0e7072e9d1a8f89b4773d927..1943d3e4daae619bc77bc071f29957450c74a2ac 100644 |
--- a/packages/quiver/lib/src/streams/streambuffer.dart |
+++ b/packages/quiver/lib/src/async/stream_buffer.dart |
@@ -12,12 +12,10 @@ |
// See the License for the specific language governing permissions and |
// limitations under the License. |
-part of quiver.streams; |
+part of quiver.async; |
-/** |
- * Underflow errors happen when the socket feeding a buffer is finished while |
- * there are still blocked readers. Each reader will complete with this error. |
- */ |
+/// Underflow errors happen when the socket feeding a buffer is finished while |
+/// there are still blocked readers. Each reader will complete with this error. |
class UnderflowError extends Error { |
final message; |
@@ -32,22 +30,20 @@ class UnderflowError extends Error { |
} |
} |
-/** |
- * Allow orderly reading of elements from a datastream, such as Socket, which |
- * might not receive List<int> bytes regular chunks. |
- * |
- * Example usage: |
- * StreamBuffer<int> buffer = new StreamBuffer(); |
- * Socket.connect('127.0.0.1', 5555).then((sock) => sock.pipe(buffer)); |
- * buffer.read(100).then((bytes) { |
- * // do something with 100 bytes; |
- * }); |
- * |
- * Throws [UnderflowError] if [throwOnError] is true. Useful for unexpected |
- * [Socket] disconnects. |
- */ |
-class StreamBuffer<T> implements StreamConsumer { |
- List _chunks = []; |
+/// Allow orderly reading of elements from a datastream, such as Socket, which |
+/// might not receive List<int> bytes regular chunks. |
+/// |
+/// Example usage: |
+/// StreamBuffer<int> buffer = new StreamBuffer(); |
+/// Socket.connect('127.0.0.1', 5555).then((sock) => sock.pipe(buffer)); |
+/// buffer.read(100).then((bytes) { |
+/// // do something with 100 bytes; |
+/// }); |
+/// |
+/// Throws [UnderflowError] if [throwOnError] is true. Useful for unexpected |
+/// [Socket] disconnects. |
+class StreamBuffer<T> implements StreamConsumer<T> { |
+ List<T> _chunks = []; |
int _offset = 0; |
int _counter = 0; // sum(_chunks[*].length) - _offset |
List<_ReaderInWaiting<List<T>>> _readers = []; |
@@ -75,23 +71,19 @@ class StreamBuffer<T> implements StreamConsumer { |
bool get limited => _limit > 0; |
- /** |
- * Create a stream buffer with optional, soft [limit] to the amount of data |
- * the buffer will hold before pausing the underlying stream. A limit of 0 |
- * means no buffer limits. |
- */ |
+ /// Create a stream buffer with optional, soft [limit] to the amount of data |
+ /// the buffer will hold before pausing the underlying stream. A limit of 0 |
+ /// means no buffer limits. |
StreamBuffer({bool throwOnError: false, int limit: 0}) |
: this._throwOnError = throwOnError, |
this._limit = limit; |
- /** |
- * The amount of unread data buffered. |
- */ |
+ /// The amount of unread data buffered. |
int get buffered => _counter; |
List<T> _consume(int size) { |
var follower = 0; |
- var ret = new List(size); |
+ var ret = new List<T>(size); |
var leftToRead = size; |
while (leftToRead > 0) { |
var chunk = _chunks.first; |
@@ -99,7 +91,7 @@ class StreamBuffer<T> implements StreamConsumer { |
var subsize = leftToRead > listCap ? listCap : leftToRead; |
if (chunk is List) { |
ret.setRange(follower, follower + subsize, |
- chunk.getRange(_offset, _offset + subsize)); |
+ (chunk as List<T>).getRange(_offset, _offset + subsize)); |
} else { |
ret[follower] = chunk; |
} |
@@ -107,7 +99,7 @@ class StreamBuffer<T> implements StreamConsumer { |
_offset += subsize; |
_counter -= subsize; |
leftToRead -= subsize; |
- if (chunk is! List || _offset >= chunk.length) { |
+ if (chunk is! List || _offset >= (chunk as List).length) { |
_offset = 0; |
_chunks.removeAt(0); |
} |
@@ -118,11 +110,9 @@ class StreamBuffer<T> implements StreamConsumer { |
return ret; |
} |
- /** |
- * Read fully [size] bytes from the stream and return in the future. |
- * |
- * Throws [ArgumentError] if size is larger than optional buffer [limit]. |
- */ |
+ /// Read fully [size] bytes from the stream and return in the future. |
+ /// |
+ /// Throws [ArgumentError] if size is larger than optional buffer [limit]. |
Future<List<T>> read(int size) { |
if (limited && size > limit) { |
throw new ArgumentError("Cannot read $size with limit $limit"); |
@@ -133,7 +123,7 @@ class StreamBuffer<T> implements StreamConsumer { |
if (size <= buffered && _readers.isEmpty) { |
return new Future.value(_consume(size)); |
} |
- Completer completer = new Completer<List<T>>(); |
+ final completer = new Completer<List<T>>(); |
_readers.add(new _ReaderInWaiting<List<T>>(size, completer)); |
return completer.future; |
} |
@@ -164,16 +154,16 @@ class StreamBuffer<T> implements StreamConsumer { |
_closed(new UnderflowError()); |
} |
streamDone.complete(); |
- }, onError: (e) { |
- _closed(e); |
+ }, onError: (e, stack) { |
+ _closed(e, stack); |
}); |
return streamDone.future; |
} |
- _closed(e) { |
+ void _closed(e, [StackTrace stack]) { |
for (var reader in _readers) { |
if (!reader.completer.isCompleted) { |
- reader.completer.completeError(e); |
+ reader.completer.completeError(e, stack); |
} |
} |
_readers.clear(); |