| OLD | NEW |
| 1 // Copyright 2014 Google Inc. All Rights Reserved. | 1 // Copyright 2014 Google Inc. All Rights Reserved. |
| 2 // | 2 // |
| 3 // Licensed under the Apache License, Version 2.0 (the "License"); | 3 // Licensed under the Apache License, Version 2.0 (the "License"); |
| 4 // you may not use this file except in compliance with the License. | 4 // you may not use this file except in compliance with the License. |
| 5 // You may obtain a copy of the License at | 5 // You may obtain a copy of the License at |
| 6 // | 6 // |
| 7 // http://www.apache.org/licenses/LICENSE-2.0 | 7 // http://www.apache.org/licenses/LICENSE-2.0 |
| 8 // | 8 // |
| 9 // Unless required by applicable law or agreed to in writing, software | 9 // Unless required by applicable law or agreed to in writing, software |
| 10 // distributed under the License is distributed on an "AS IS" BASIS, | 10 // distributed under the License is distributed on an "AS IS" BASIS, |
| 11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | 11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| 12 // See the License for the specific language governing permissions and | 12 // See the License for the specific language governing permissions and |
| 13 // limitations under the License. | 13 // limitations under the License. |
| 14 | 14 |
| 15 part of quiver.streams; | 15 part of quiver.async; |
| 16 | 16 |
| 17 /** | 17 /// Underflow errors happen when the socket feeding a buffer is finished while |
| 18 * Underflow errors happen when the socket feeding a buffer is finished while | 18 /// there are still blocked readers. Each reader will complete with this error. |
| 19 * there are still blocked readers. Each reader will complete with this error. | |
| 20 */ | |
| 21 class UnderflowError extends Error { | 19 class UnderflowError extends Error { |
| 22 final message; | 20 final message; |
| 23 | 21 |
| 24 /// The [message] describes the underflow. | 22 /// The [message] describes the underflow. |
| 25 UnderflowError([this.message]); | 23 UnderflowError([this.message]); |
| 26 | 24 |
| 27 String toString() { | 25 String toString() { |
| 28 if (message != null) { | 26 if (message != null) { |
| 29 return "StreamBuffer Underflow: $message"; | 27 return "StreamBuffer Underflow: $message"; |
| 30 } | 28 } |
| 31 return "StreamBuffer Underflow"; | 29 return "StreamBuffer Underflow"; |
| 32 } | 30 } |
| 33 } | 31 } |
| 34 | 32 |
| 35 /** | 33 /// Allow orderly reading of elements from a datastream, such as Socket, which |
| 36 * Allow orderly reading of elements from a datastream, such as Socket, which | 34 /// might not receive List<int> bytes regular chunks. |
| 37 * might not receive List<int> bytes regular chunks. | 35 /// |
| 38 * | 36 /// Example usage: |
| 39 * Example usage: | 37 /// StreamBuffer<int> buffer = new StreamBuffer(); |
| 40 * StreamBuffer<int> buffer = new StreamBuffer(); | 38 /// Socket.connect('127.0.0.1', 5555).then((sock) => sock.pipe(buffer)); |
| 41 * Socket.connect('127.0.0.1', 5555).then((sock) => sock.pipe(buffer)); | 39 /// buffer.read(100).then((bytes) { |
| 42 * buffer.read(100).then((bytes) { | 40 /// // do something with 100 bytes; |
| 43 * // do something with 100 bytes; | 41 /// }); |
| 44 * }); | 42 /// |
| 45 * | 43 /// Throws [UnderflowError] if [throwOnError] is true. Useful for unexpected |
| 46 * Throws [UnderflowError] if [throwOnError] is true. Useful for unexpected | 44 /// [Socket] disconnects. |
| 47 * [Socket] disconnects. | 45 class StreamBuffer<T> implements StreamConsumer<T> { |
| 48 */ | 46 List<T> _chunks = []; |
| 49 class StreamBuffer<T> implements StreamConsumer { | |
| 50 List _chunks = []; | |
| 51 int _offset = 0; | 47 int _offset = 0; |
| 52 int _counter = 0; // sum(_chunks[*].length) - _offset | 48 int _counter = 0; // sum(_chunks[*].length) - _offset |
| 53 List<_ReaderInWaiting<List<T>>> _readers = []; | 49 List<_ReaderInWaiting<List<T>>> _readers = []; |
| 54 StreamSubscription<T> _sub; | 50 StreamSubscription<T> _sub; |
| 55 Completer _streamDone; | 51 Completer _streamDone; |
| 56 | 52 |
| 57 final bool _throwOnError; | 53 final bool _throwOnError; |
| 58 | 54 |
| 59 Stream _currentStream; | 55 Stream _currentStream; |
| 60 | 56 |
| 61 int _limit = 0; | 57 int _limit = 0; |
| 62 | 58 |
| 63 set limit(int limit) { | 59 set limit(int limit) { |
| 64 _limit = limit; | 60 _limit = limit; |
| 65 if (_sub != null) { | 61 if (_sub != null) { |
| 66 if (!limited || _counter < limit) { | 62 if (!limited || _counter < limit) { |
| 67 _sub.resume(); | 63 _sub.resume(); |
| 68 } else { | 64 } else { |
| 69 _sub.pause(); | 65 _sub.pause(); |
| 70 } | 66 } |
| 71 } | 67 } |
| 72 } | 68 } |
| 73 | 69 |
| 74 int get limit => _limit; | 70 int get limit => _limit; |
| 75 | 71 |
| 76 bool get limited => _limit > 0; | 72 bool get limited => _limit > 0; |
| 77 | 73 |
| 78 /** | 74 /// Create a stream buffer with optional, soft [limit] to the amount of data |
| 79 * Create a stream buffer with optional, soft [limit] to the amount of data | 75 /// the buffer will hold before pausing the underlying stream. A limit of 0 |
| 80 * the buffer will hold before pausing the underlying stream. A limit of 0 | 76 /// means no buffer limits. |
| 81 * means no buffer limits. | |
| 82 */ | |
| 83 StreamBuffer({bool throwOnError: false, int limit: 0}) | 77 StreamBuffer({bool throwOnError: false, int limit: 0}) |
| 84 : this._throwOnError = throwOnError, | 78 : this._throwOnError = throwOnError, |
| 85 this._limit = limit; | 79 this._limit = limit; |
| 86 | 80 |
| 87 /** | 81 /// The amount of unread data buffered. |
| 88 * The amount of unread data buffered. | |
| 89 */ | |
| 90 int get buffered => _counter; | 82 int get buffered => _counter; |
| 91 | 83 |
| 92 List<T> _consume(int size) { | 84 List<T> _consume(int size) { |
| 93 var follower = 0; | 85 var follower = 0; |
| 94 var ret = new List(size); | 86 var ret = new List<T>(size); |
| 95 var leftToRead = size; | 87 var leftToRead = size; |
| 96 while (leftToRead > 0) { | 88 while (leftToRead > 0) { |
| 97 var chunk = _chunks.first; | 89 var chunk = _chunks.first; |
| 98 var listCap = (chunk is List) ? chunk.length - _offset : 1; | 90 var listCap = (chunk is List) ? chunk.length - _offset : 1; |
| 99 var subsize = leftToRead > listCap ? listCap : leftToRead; | 91 var subsize = leftToRead > listCap ? listCap : leftToRead; |
| 100 if (chunk is List) { | 92 if (chunk is List) { |
| 101 ret.setRange(follower, follower + subsize, | 93 ret.setRange(follower, follower + subsize, |
| 102 chunk.getRange(_offset, _offset + subsize)); | 94 (chunk as List<T>).getRange(_offset, _offset + subsize)); |
| 103 } else { | 95 } else { |
| 104 ret[follower] = chunk; | 96 ret[follower] = chunk; |
| 105 } | 97 } |
| 106 follower += subsize; | 98 follower += subsize; |
| 107 _offset += subsize; | 99 _offset += subsize; |
| 108 _counter -= subsize; | 100 _counter -= subsize; |
| 109 leftToRead -= subsize; | 101 leftToRead -= subsize; |
| 110 if (chunk is! List || _offset >= chunk.length) { | 102 if (chunk is! List || _offset >= (chunk as List).length) { |
| 111 _offset = 0; | 103 _offset = 0; |
| 112 _chunks.removeAt(0); | 104 _chunks.removeAt(0); |
| 113 } | 105 } |
| 114 } | 106 } |
| 115 if (limited && _sub.isPaused && _counter < limit) { | 107 if (limited && _sub.isPaused && _counter < limit) { |
| 116 _sub.resume(); | 108 _sub.resume(); |
| 117 } | 109 } |
| 118 return ret; | 110 return ret; |
| 119 } | 111 } |
| 120 | 112 |
| 121 /** | 113 /// Read fully [size] bytes from the stream and return in the future. |
| 122 * Read fully [size] bytes from the stream and return in the future. | 114 /// |
| 123 * | 115 /// Throws [ArgumentError] if size is larger than optional buffer [limit]. |
| 124 * Throws [ArgumentError] if size is larger than optional buffer [limit]. | |
| 125 */ | |
| 126 Future<List<T>> read(int size) { | 116 Future<List<T>> read(int size) { |
| 127 if (limited && size > limit) { | 117 if (limited && size > limit) { |
| 128 throw new ArgumentError("Cannot read $size with limit $limit"); | 118 throw new ArgumentError("Cannot read $size with limit $limit"); |
| 129 } | 119 } |
| 130 | 120 |
| 131 // If we have enough data to consume and there are no other readers, then | 121 // If we have enough data to consume and there are no other readers, then |
| 132 // we can return immediately. | 122 // we can return immediately. |
| 133 if (size <= buffered && _readers.isEmpty) { | 123 if (size <= buffered && _readers.isEmpty) { |
| 134 return new Future.value(_consume(size)); | 124 return new Future.value(_consume(size)); |
| 135 } | 125 } |
| 136 Completer completer = new Completer<List<T>>(); | 126 final completer = new Completer<List<T>>(); |
| 137 _readers.add(new _ReaderInWaiting<List<T>>(size, completer)); | 127 _readers.add(new _ReaderInWaiting<List<T>>(size, completer)); |
| 138 return completer.future; | 128 return completer.future; |
| 139 } | 129 } |
| 140 | 130 |
| 141 @override | 131 @override |
| 142 Future addStream(Stream<T> stream) { | 132 Future addStream(Stream<T> stream) { |
| 143 var lastStream = _currentStream == null ? stream : _currentStream; | 133 var lastStream = _currentStream == null ? stream : _currentStream; |
| 144 if (_sub != null) { | 134 if (_sub != null) { |
| 145 _sub.cancel(); | 135 _sub.cancel(); |
| 146 _streamDone.complete(); | 136 _streamDone.complete(); |
| (...skipping 10 matching lines...) Expand all Loading... |
| 157 while (_readers.isNotEmpty && _readers.first.size <= _counter) { | 147 while (_readers.isNotEmpty && _readers.first.size <= _counter) { |
| 158 var waiting = _readers.removeAt(0); | 148 var waiting = _readers.removeAt(0); |
| 159 waiting.completer.complete(_consume(waiting.size)); | 149 waiting.completer.complete(_consume(waiting.size)); |
| 160 } | 150 } |
| 161 }, onDone: () { | 151 }, onDone: () { |
| 162 // User is piping in a new stream | 152 // User is piping in a new stream |
| 163 if (stream == lastStream && _throwOnError) { | 153 if (stream == lastStream && _throwOnError) { |
| 164 _closed(new UnderflowError()); | 154 _closed(new UnderflowError()); |
| 165 } | 155 } |
| 166 streamDone.complete(); | 156 streamDone.complete(); |
| 167 }, onError: (e) { | 157 }, onError: (e, stack) { |
| 168 _closed(e); | 158 _closed(e, stack); |
| 169 }); | 159 }); |
| 170 return streamDone.future; | 160 return streamDone.future; |
| 171 } | 161 } |
| 172 | 162 |
| 173 _closed(e) { | 163 void _closed(e, [StackTrace stack]) { |
| 174 for (var reader in _readers) { | 164 for (var reader in _readers) { |
| 175 if (!reader.completer.isCompleted) { | 165 if (!reader.completer.isCompleted) { |
| 176 reader.completer.completeError(e); | 166 reader.completer.completeError(e, stack); |
| 177 } | 167 } |
| 178 } | 168 } |
| 179 _readers.clear(); | 169 _readers.clear(); |
| 180 } | 170 } |
| 181 | 171 |
| 182 Future close() { | 172 Future close() { |
| 183 var ret; | 173 var ret; |
| 184 if (_sub != null) { | 174 if (_sub != null) { |
| 185 ret = _sub.cancel(); | 175 ret = _sub.cancel(); |
| 186 _sub = null; | 176 _sub = null; |
| 187 } | 177 } |
| 188 return ret is Future ? ret : new Future.value(); | 178 return ret is Future ? ret : new Future.value(); |
| 189 } | 179 } |
| 190 } | 180 } |
| 191 | 181 |
| 192 class _ReaderInWaiting<T> { | 182 class _ReaderInWaiting<T> { |
| 193 int size; | 183 int size; |
| 194 Completer<T> completer; | 184 Completer<T> completer; |
| 195 _ReaderInWaiting(this.size, this.completer); | 185 _ReaderInWaiting(this.size, this.completer); |
| 196 } | 186 } |
| OLD | NEW |