| OLD | NEW |
| (Empty) |
| 1 // Copyright 2014 Google Inc. All Rights Reserved. | |
| 2 // | |
| 3 // Licensed under the Apache License, Version 2.0 (the "License"); | |
| 4 // you may not use this file except in compliance with the License. | |
| 5 // You may obtain a copy of the License at | |
| 6 // | |
| 7 // http://www.apache.org/licenses/LICENSE-2.0 | |
| 8 // | |
| 9 // Unless required by applicable law or agreed to in writing, software | |
| 10 // distributed under the License is distributed on an "AS IS" BASIS, | |
| 11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |
| 12 // See the License for the specific language governing permissions and | |
| 13 // limitations under the License. | |
| 14 | |
| 15 part of quiver.streams; | |
| 16 | |
| 17 /** | |
| 18 * Underflow errors happen when the socket feeding a buffer is finished while | |
| 19 * there are still blocked readers. Each reader will complete with this error. | |
| 20 */ | |
| 21 class UnderflowError extends Error { | |
| 22 final message; | |
| 23 | |
| 24 /// The [message] describes the underflow. | |
| 25 UnderflowError([this.message]); | |
| 26 | |
| 27 String toString() { | |
| 28 if (message != null) { | |
| 29 return "StreamBuffer Underflow: $message"; | |
| 30 } | |
| 31 return "StreamBuffer Underflow"; | |
| 32 } | |
| 33 } | |
| 34 | |
| 35 /** | |
| 36 * Allow orderly reading of elements from a datastream, such as Socket, which | |
| 37 * might not receive List<int> bytes regular chunks. | |
| 38 * | |
| 39 * Example usage: | |
| 40 * StreamBuffer<int> buffer = new StreamBuffer(); | |
| 41 * Socket.connect('127.0.0.1', 5555).then((sock) => sock.pipe(buffer)); | |
| 42 * buffer.read(100).then((bytes) { | |
| 43 * // do something with 100 bytes; | |
| 44 * }); | |
| 45 * | |
| 46 * Throws [UnderflowError] if [throwOnError] is true. Useful for unexpected | |
| 47 * [Socket] disconnects. | |
| 48 */ | |
| 49 class StreamBuffer<T> implements StreamConsumer { | |
| 50 List _chunks = []; | |
| 51 int _offset = 0; | |
| 52 int _counter = 0; // sum(_chunks[*].length) - _offset | |
| 53 List<_ReaderInWaiting<List<T>>> _readers = []; | |
| 54 StreamSubscription<T> _sub; | |
| 55 Completer _streamDone; | |
| 56 | |
| 57 final bool _throwOnError; | |
| 58 | |
| 59 Stream _currentStream; | |
| 60 | |
| 61 int _limit = 0; | |
| 62 | |
| 63 set limit(int limit) { | |
| 64 _limit = limit; | |
| 65 if (_sub != null) { | |
| 66 if (!limited || _counter < limit) { | |
| 67 _sub.resume(); | |
| 68 } else { | |
| 69 _sub.pause(); | |
| 70 } | |
| 71 } | |
| 72 } | |
| 73 | |
| 74 int get limit => _limit; | |
| 75 | |
| 76 bool get limited => _limit > 0; | |
| 77 | |
| 78 /** | |
| 79 * Create a stream buffer with optional, soft [limit] to the amount of data | |
| 80 * the buffer will hold before pausing the underlying stream. A limit of 0 | |
| 81 * means no buffer limits. | |
| 82 */ | |
| 83 StreamBuffer({bool throwOnError: false, int limit: 0}) | |
| 84 : this._throwOnError = throwOnError, | |
| 85 this._limit = limit; | |
| 86 | |
| 87 /** | |
| 88 * The amount of unread data buffered. | |
| 89 */ | |
| 90 int get buffered => _counter; | |
| 91 | |
| 92 List<T> _consume(int size) { | |
| 93 var follower = 0; | |
| 94 var ret = new List(size); | |
| 95 var leftToRead = size; | |
| 96 while (leftToRead > 0) { | |
| 97 var chunk = _chunks.first; | |
| 98 var listCap = (chunk is List) ? chunk.length - _offset : 1; | |
| 99 var subsize = leftToRead > listCap ? listCap : leftToRead; | |
| 100 if (chunk is List) { | |
| 101 ret.setRange(follower, follower + subsize, | |
| 102 chunk.getRange(_offset, _offset + subsize)); | |
| 103 } else { | |
| 104 ret[follower] = chunk; | |
| 105 } | |
| 106 follower += subsize; | |
| 107 _offset += subsize; | |
| 108 _counter -= subsize; | |
| 109 leftToRead -= subsize; | |
| 110 if (chunk is! List || _offset >= chunk.length) { | |
| 111 _offset = 0; | |
| 112 _chunks.removeAt(0); | |
| 113 } | |
| 114 } | |
| 115 if (limited && _sub.isPaused && _counter < limit) { | |
| 116 _sub.resume(); | |
| 117 } | |
| 118 return ret; | |
| 119 } | |
| 120 | |
| 121 /** | |
| 122 * Read fully [size] bytes from the stream and return in the future. | |
| 123 * | |
| 124 * Throws [ArgumentError] if size is larger than optional buffer [limit]. | |
| 125 */ | |
| 126 Future<List<T>> read(int size) { | |
| 127 if (limited && size > limit) { | |
| 128 throw new ArgumentError("Cannot read $size with limit $limit"); | |
| 129 } | |
| 130 | |
| 131 // If we have enough data to consume and there are no other readers, then | |
| 132 // we can return immediately. | |
| 133 if (size <= buffered && _readers.isEmpty) { | |
| 134 return new Future.value(_consume(size)); | |
| 135 } | |
| 136 Completer completer = new Completer<List<T>>(); | |
| 137 _readers.add(new _ReaderInWaiting<List<T>>(size, completer)); | |
| 138 return completer.future; | |
| 139 } | |
| 140 | |
| 141 @override | |
| 142 Future addStream(Stream<T> stream) { | |
| 143 var lastStream = _currentStream == null ? stream : _currentStream; | |
| 144 if (_sub != null) { | |
| 145 _sub.cancel(); | |
| 146 _streamDone.complete(); | |
| 147 } | |
| 148 _currentStream = stream; | |
| 149 Completer streamDone = new Completer(); | |
| 150 _sub = stream.listen((items) { | |
| 151 _chunks.add(items); | |
| 152 _counter += items is List ? items.length : 1; | |
| 153 if (limited && _counter >= limit) { | |
| 154 _sub.pause(); | |
| 155 } | |
| 156 | |
| 157 while (_readers.isNotEmpty && _readers.first.size <= _counter) { | |
| 158 var waiting = _readers.removeAt(0); | |
| 159 waiting.completer.complete(_consume(waiting.size)); | |
| 160 } | |
| 161 }, onDone: () { | |
| 162 // User is piping in a new stream | |
| 163 if (stream == lastStream && _throwOnError) { | |
| 164 _closed(new UnderflowError()); | |
| 165 } | |
| 166 streamDone.complete(); | |
| 167 }, onError: (e) { | |
| 168 _closed(e); | |
| 169 }); | |
| 170 return streamDone.future; | |
| 171 } | |
| 172 | |
| 173 _closed(e) { | |
| 174 for (var reader in _readers) { | |
| 175 if (!reader.completer.isCompleted) { | |
| 176 reader.completer.completeError(e); | |
| 177 } | |
| 178 } | |
| 179 _readers.clear(); | |
| 180 } | |
| 181 | |
| 182 Future close() { | |
| 183 var ret; | |
| 184 if (_sub != null) { | |
| 185 ret = _sub.cancel(); | |
| 186 _sub = null; | |
| 187 } | |
| 188 return ret is Future ? ret : new Future.value(); | |
| 189 } | |
| 190 } | |
| 191 | |
| 192 class _ReaderInWaiting<T> { | |
| 193 int size; | |
| 194 Completer<T> completer; | |
| 195 _ReaderInWaiting(this.size, this.completer); | |
| 196 } | |
| OLD | NEW |