Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(278)

Unified Diff: quiver/lib/src/streams/streambuffer.dart

Issue 1400473008: Roll Observatory packages and add a roll script (Closed) Base URL: git@github.com:dart-lang/observatory_pub_packages.git@master
Patch Set: Created 5 years, 2 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
« no previous file with comments | « quiver/lib/src/streams/enumerate.dart ('k') | quiver/lib/src/time/clock.dart » ('j') | no next file with comments »
Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
Index: quiver/lib/src/streams/streambuffer.dart
diff --git a/quiver/lib/src/streams/streambuffer.dart b/quiver/lib/src/streams/streambuffer.dart
deleted file mode 100644
index bca610810268a08e0e7072e9d1a8f89b4773d927..0000000000000000000000000000000000000000
--- a/quiver/lib/src/streams/streambuffer.dart
+++ /dev/null
@@ -1,196 +0,0 @@
-// Copyright 2014 Google Inc. All Rights Reserved.
-//
-// Licensed under the Apache License, Version 2.0 (the "License");
-// you may not use this file except in compliance with the License.
-// You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing, software
-// distributed under the License is distributed on an "AS IS" BASIS,
-// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// See the License for the specific language governing permissions and
-// limitations under the License.
-
-part of quiver.streams;
-
-/**
- * Underflow errors happen when the socket feeding a buffer is finished while
- * there are still blocked readers. Each reader will complete with this error.
- */
-class UnderflowError extends Error {
- final message;
-
- /// The [message] describes the underflow.
- UnderflowError([this.message]);
-
- String toString() {
- if (message != null) {
- return "StreamBuffer Underflow: $message";
- }
- return "StreamBuffer Underflow";
- }
-}
-
-/**
- * Allow orderly reading of elements from a datastream, such as Socket, which
- * might not receive List<int> bytes regular chunks.
- *
- * Example usage:
- * StreamBuffer<int> buffer = new StreamBuffer();
- * Socket.connect('127.0.0.1', 5555).then((sock) => sock.pipe(buffer));
- * buffer.read(100).then((bytes) {
- * // do something with 100 bytes;
- * });
- *
- * Throws [UnderflowError] if [throwOnError] is true. Useful for unexpected
- * [Socket] disconnects.
- */
-class StreamBuffer<T> implements StreamConsumer {
- List _chunks = [];
- int _offset = 0;
- int _counter = 0; // sum(_chunks[*].length) - _offset
- List<_ReaderInWaiting<List<T>>> _readers = [];
- StreamSubscription<T> _sub;
- Completer _streamDone;
-
- final bool _throwOnError;
-
- Stream _currentStream;
-
- int _limit = 0;
-
- set limit(int limit) {
- _limit = limit;
- if (_sub != null) {
- if (!limited || _counter < limit) {
- _sub.resume();
- } else {
- _sub.pause();
- }
- }
- }
-
- int get limit => _limit;
-
- bool get limited => _limit > 0;
-
- /**
- * Create a stream buffer with optional, soft [limit] to the amount of data
- * the buffer will hold before pausing the underlying stream. A limit of 0
- * means no buffer limits.
- */
- StreamBuffer({bool throwOnError: false, int limit: 0})
- : this._throwOnError = throwOnError,
- this._limit = limit;
-
- /**
- * The amount of unread data buffered.
- */
- int get buffered => _counter;
-
- List<T> _consume(int size) {
- var follower = 0;
- var ret = new List(size);
- var leftToRead = size;
- while (leftToRead > 0) {
- var chunk = _chunks.first;
- var listCap = (chunk is List) ? chunk.length - _offset : 1;
- var subsize = leftToRead > listCap ? listCap : leftToRead;
- if (chunk is List) {
- ret.setRange(follower, follower + subsize,
- chunk.getRange(_offset, _offset + subsize));
- } else {
- ret[follower] = chunk;
- }
- follower += subsize;
- _offset += subsize;
- _counter -= subsize;
- leftToRead -= subsize;
- if (chunk is! List || _offset >= chunk.length) {
- _offset = 0;
- _chunks.removeAt(0);
- }
- }
- if (limited && _sub.isPaused && _counter < limit) {
- _sub.resume();
- }
- return ret;
- }
-
- /**
- * Read fully [size] bytes from the stream and return in the future.
- *
- * Throws [ArgumentError] if size is larger than optional buffer [limit].
- */
- Future<List<T>> read(int size) {
- if (limited && size > limit) {
- throw new ArgumentError("Cannot read $size with limit $limit");
- }
-
- // If we have enough data to consume and there are no other readers, then
- // we can return immediately.
- if (size <= buffered && _readers.isEmpty) {
- return new Future.value(_consume(size));
- }
- Completer completer = new Completer<List<T>>();
- _readers.add(new _ReaderInWaiting<List<T>>(size, completer));
- return completer.future;
- }
-
- @override
- Future addStream(Stream<T> stream) {
- var lastStream = _currentStream == null ? stream : _currentStream;
- if (_sub != null) {
- _sub.cancel();
- _streamDone.complete();
- }
- _currentStream = stream;
- Completer streamDone = new Completer();
- _sub = stream.listen((items) {
- _chunks.add(items);
- _counter += items is List ? items.length : 1;
- if (limited && _counter >= limit) {
- _sub.pause();
- }
-
- while (_readers.isNotEmpty && _readers.first.size <= _counter) {
- var waiting = _readers.removeAt(0);
- waiting.completer.complete(_consume(waiting.size));
- }
- }, onDone: () {
- // User is piping in a new stream
- if (stream == lastStream && _throwOnError) {
- _closed(new UnderflowError());
- }
- streamDone.complete();
- }, onError: (e) {
- _closed(e);
- });
- return streamDone.future;
- }
-
- _closed(e) {
- for (var reader in _readers) {
- if (!reader.completer.isCompleted) {
- reader.completer.completeError(e);
- }
- }
- _readers.clear();
- }
-
- Future close() {
- var ret;
- if (_sub != null) {
- ret = _sub.cancel();
- _sub = null;
- }
- return ret is Future ? ret : new Future.value();
- }
-}
-
-class _ReaderInWaiting<T> {
- int size;
- Completer<T> completer;
- _ReaderInWaiting(this.size, this.completer);
-}
« no previous file with comments | « quiver/lib/src/streams/enumerate.dart ('k') | quiver/lib/src/time/clock.dart » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698