Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(18)

Unified Diff: packages/quiver/lib/src/async/stream_buffer.dart

Issue 2989763002: Update charted to 0.4.8 and roll (Closed)
Patch Set: Removed Cutch from list of reviewers Created 3 years, 5 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
« no previous file with comments | « packages/quiver/lib/src/async/metronome.dart ('k') | packages/quiver/lib/src/async/stream_router.dart » ('j') | no next file with comments »
Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
Index: packages/quiver/lib/src/async/stream_buffer.dart
diff --git a/packages/quiver/lib/src/streams/streambuffer.dart b/packages/quiver/lib/src/async/stream_buffer.dart
similarity index 72%
rename from packages/quiver/lib/src/streams/streambuffer.dart
rename to packages/quiver/lib/src/async/stream_buffer.dart
index bca610810268a08e0e7072e9d1a8f89b4773d927..1943d3e4daae619bc77bc071f29957450c74a2ac 100644
--- a/packages/quiver/lib/src/streams/streambuffer.dart
+++ b/packages/quiver/lib/src/async/stream_buffer.dart
@@ -12,12 +12,10 @@
// See the License for the specific language governing permissions and
// limitations under the License.
-part of quiver.streams;
+part of quiver.async;
-/**
- * Underflow errors happen when the socket feeding a buffer is finished while
- * there are still blocked readers. Each reader will complete with this error.
- */
+/// Underflow errors happen when the socket feeding a buffer is finished while
+/// there are still blocked readers. Each reader will complete with this error.
class UnderflowError extends Error {
final message;
@@ -32,22 +30,20 @@ class UnderflowError extends Error {
}
}
-/**
- * Allow orderly reading of elements from a datastream, such as Socket, which
- * might not receive List<int> bytes regular chunks.
- *
- * Example usage:
- * StreamBuffer<int> buffer = new StreamBuffer();
- * Socket.connect('127.0.0.1', 5555).then((sock) => sock.pipe(buffer));
- * buffer.read(100).then((bytes) {
- * // do something with 100 bytes;
- * });
- *
- * Throws [UnderflowError] if [throwOnError] is true. Useful for unexpected
- * [Socket] disconnects.
- */
-class StreamBuffer<T> implements StreamConsumer {
- List _chunks = [];
+/// Allow orderly reading of elements from a datastream, such as Socket, which
+/// might not receive List<int> bytes regular chunks.
+///
+/// Example usage:
+/// StreamBuffer<int> buffer = new StreamBuffer();
+/// Socket.connect('127.0.0.1', 5555).then((sock) => sock.pipe(buffer));
+/// buffer.read(100).then((bytes) {
+/// // do something with 100 bytes;
+/// });
+///
+/// Throws [UnderflowError] if [throwOnError] is true. Useful for unexpected
+/// [Socket] disconnects.
+class StreamBuffer<T> implements StreamConsumer<T> {
+ List<T> _chunks = [];
int _offset = 0;
int _counter = 0; // sum(_chunks[*].length) - _offset
List<_ReaderInWaiting<List<T>>> _readers = [];
@@ -75,23 +71,19 @@ class StreamBuffer<T> implements StreamConsumer {
bool get limited => _limit > 0;
- /**
- * Create a stream buffer with optional, soft [limit] to the amount of data
- * the buffer will hold before pausing the underlying stream. A limit of 0
- * means no buffer limits.
- */
+ /// Create a stream buffer with optional, soft [limit] to the amount of data
+ /// the buffer will hold before pausing the underlying stream. A limit of 0
+ /// means no buffer limits.
StreamBuffer({bool throwOnError: false, int limit: 0})
: this._throwOnError = throwOnError,
this._limit = limit;
- /**
- * The amount of unread data buffered.
- */
+ /// The amount of unread data buffered.
int get buffered => _counter;
List<T> _consume(int size) {
var follower = 0;
- var ret = new List(size);
+ var ret = new List<T>(size);
var leftToRead = size;
while (leftToRead > 0) {
var chunk = _chunks.first;
@@ -99,7 +91,7 @@ class StreamBuffer<T> implements StreamConsumer {
var subsize = leftToRead > listCap ? listCap : leftToRead;
if (chunk is List) {
ret.setRange(follower, follower + subsize,
- chunk.getRange(_offset, _offset + subsize));
+ (chunk as List<T>).getRange(_offset, _offset + subsize));
} else {
ret[follower] = chunk;
}
@@ -107,7 +99,7 @@ class StreamBuffer<T> implements StreamConsumer {
_offset += subsize;
_counter -= subsize;
leftToRead -= subsize;
- if (chunk is! List || _offset >= chunk.length) {
+ if (chunk is! List || _offset >= (chunk as List).length) {
_offset = 0;
_chunks.removeAt(0);
}
@@ -118,11 +110,9 @@ class StreamBuffer<T> implements StreamConsumer {
return ret;
}
- /**
- * Read fully [size] bytes from the stream and return in the future.
- *
- * Throws [ArgumentError] if size is larger than optional buffer [limit].
- */
+ /// Read fully [size] bytes from the stream and return in the future.
+ ///
+ /// Throws [ArgumentError] if size is larger than optional buffer [limit].
Future<List<T>> read(int size) {
if (limited && size > limit) {
throw new ArgumentError("Cannot read $size with limit $limit");
@@ -133,7 +123,7 @@ class StreamBuffer<T> implements StreamConsumer {
if (size <= buffered && _readers.isEmpty) {
return new Future.value(_consume(size));
}
- Completer completer = new Completer<List<T>>();
+ final completer = new Completer<List<T>>();
_readers.add(new _ReaderInWaiting<List<T>>(size, completer));
return completer.future;
}
@@ -164,16 +154,16 @@ class StreamBuffer<T> implements StreamConsumer {
_closed(new UnderflowError());
}
streamDone.complete();
- }, onError: (e) {
- _closed(e);
+ }, onError: (e, stack) {
+ _closed(e, stack);
});
return streamDone.future;
}
- _closed(e) {
+ void _closed(e, [StackTrace stack]) {
for (var reader in _readers) {
if (!reader.completer.isCompleted) {
- reader.completer.completeError(e);
+ reader.completer.completeError(e, stack);
}
}
_readers.clear();
« no previous file with comments | « packages/quiver/lib/src/async/metronome.dart ('k') | packages/quiver/lib/src/async/stream_router.dart » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698