OLD | NEW |
1 // Copyright 2014 Google Inc. All Rights Reserved. | 1 // Copyright 2014 Google Inc. All Rights Reserved. |
2 // | 2 // |
3 // Licensed under the Apache License, Version 2.0 (the "License"); | 3 // Licensed under the Apache License, Version 2.0 (the "License"); |
4 // you may not use this file except in compliance with the License. | 4 // you may not use this file except in compliance with the License. |
5 // You may obtain a copy of the License at | 5 // You may obtain a copy of the License at |
6 // | 6 // |
7 // http://www.apache.org/licenses/LICENSE-2.0 | 7 // http://www.apache.org/licenses/LICENSE-2.0 |
8 // | 8 // |
9 // Unless required by applicable law or agreed to in writing, software | 9 // Unless required by applicable law or agreed to in writing, software |
10 // distributed under the License is distributed on an "AS IS" BASIS, | 10 // distributed under the License is distributed on an "AS IS" BASIS, |
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | 11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
12 // See the License for the specific language governing permissions and | 12 // See the License for the specific language governing permissions and |
13 // limitations under the License. | 13 // limitations under the License. |
14 | 14 |
15 part of quiver.streams; | 15 part of quiver.async; |
16 | 16 |
17 /** | 17 /// Underflow errors happen when the socket feeding a buffer is finished while |
18 * Underflow errors happen when the socket feeding a buffer is finished while | 18 /// there are still blocked readers. Each reader will complete with this error. |
19 * there are still blocked readers. Each reader will complete with this error. | |
20 */ | |
21 class UnderflowError extends Error { | 19 class UnderflowError extends Error { |
22 final message; | 20 final message; |
23 | 21 |
24 /// The [message] describes the underflow. | 22 /// The [message] describes the underflow. |
25 UnderflowError([this.message]); | 23 UnderflowError([this.message]); |
26 | 24 |
27 String toString() { | 25 String toString() { |
28 if (message != null) { | 26 if (message != null) { |
29 return "StreamBuffer Underflow: $message"; | 27 return "StreamBuffer Underflow: $message"; |
30 } | 28 } |
31 return "StreamBuffer Underflow"; | 29 return "StreamBuffer Underflow"; |
32 } | 30 } |
33 } | 31 } |
34 | 32 |
35 /** | 33 /// Allow orderly reading of elements from a datastream, such as Socket, which |
36 * Allow orderly reading of elements from a datastream, such as Socket, which | 34 /// might not receive List<int> bytes regular chunks. |
37 * might not receive List<int> bytes regular chunks. | 35 /// |
38 * | 36 /// Example usage: |
39 * Example usage: | 37 /// StreamBuffer<int> buffer = new StreamBuffer(); |
40 * StreamBuffer<int> buffer = new StreamBuffer(); | 38 /// Socket.connect('127.0.0.1', 5555).then((sock) => sock.pipe(buffer)); |
41 * Socket.connect('127.0.0.1', 5555).then((sock) => sock.pipe(buffer)); | 39 /// buffer.read(100).then((bytes) { |
42 * buffer.read(100).then((bytes) { | 40 /// // do something with 100 bytes; |
43 * // do something with 100 bytes; | 41 /// }); |
44 * }); | 42 /// |
45 * | 43 /// Throws [UnderflowError] if [throwOnError] is true. Useful for unexpected |
46 * Throws [UnderflowError] if [throwOnError] is true. Useful for unexpected | 44 /// [Socket] disconnects. |
47 * [Socket] disconnects. | 45 class StreamBuffer<T> implements StreamConsumer<T> { |
48 */ | 46 List<T> _chunks = []; |
49 class StreamBuffer<T> implements StreamConsumer { | |
50 List _chunks = []; | |
51 int _offset = 0; | 47 int _offset = 0; |
52 int _counter = 0; // sum(_chunks[*].length) - _offset | 48 int _counter = 0; // sum(_chunks[*].length) - _offset |
53 List<_ReaderInWaiting<List<T>>> _readers = []; | 49 List<_ReaderInWaiting<List<T>>> _readers = []; |
54 StreamSubscription<T> _sub; | 50 StreamSubscription<T> _sub; |
55 Completer _streamDone; | 51 Completer _streamDone; |
56 | 52 |
57 final bool _throwOnError; | 53 final bool _throwOnError; |
58 | 54 |
59 Stream _currentStream; | 55 Stream _currentStream; |
60 | 56 |
61 int _limit = 0; | 57 int _limit = 0; |
62 | 58 |
63 set limit(int limit) { | 59 set limit(int limit) { |
64 _limit = limit; | 60 _limit = limit; |
65 if (_sub != null) { | 61 if (_sub != null) { |
66 if (!limited || _counter < limit) { | 62 if (!limited || _counter < limit) { |
67 _sub.resume(); | 63 _sub.resume(); |
68 } else { | 64 } else { |
69 _sub.pause(); | 65 _sub.pause(); |
70 } | 66 } |
71 } | 67 } |
72 } | 68 } |
73 | 69 |
74 int get limit => _limit; | 70 int get limit => _limit; |
75 | 71 |
76 bool get limited => _limit > 0; | 72 bool get limited => _limit > 0; |
77 | 73 |
78 /** | 74 /// Create a stream buffer with optional, soft [limit] to the amount of data |
79 * Create a stream buffer with optional, soft [limit] to the amount of data | 75 /// the buffer will hold before pausing the underlying stream. A limit of 0 |
80 * the buffer will hold before pausing the underlying stream. A limit of 0 | 76 /// means no buffer limits. |
81 * means no buffer limits. | |
82 */ | |
83 StreamBuffer({bool throwOnError: false, int limit: 0}) | 77 StreamBuffer({bool throwOnError: false, int limit: 0}) |
84 : this._throwOnError = throwOnError, | 78 : this._throwOnError = throwOnError, |
85 this._limit = limit; | 79 this._limit = limit; |
86 | 80 |
87 /** | 81 /// The amount of unread data buffered. |
88 * The amount of unread data buffered. | |
89 */ | |
90 int get buffered => _counter; | 82 int get buffered => _counter; |
91 | 83 |
92 List<T> _consume(int size) { | 84 List<T> _consume(int size) { |
93 var follower = 0; | 85 var follower = 0; |
94 var ret = new List(size); | 86 var ret = new List<T>(size); |
95 var leftToRead = size; | 87 var leftToRead = size; |
96 while (leftToRead > 0) { | 88 while (leftToRead > 0) { |
97 var chunk = _chunks.first; | 89 var chunk = _chunks.first; |
98 var listCap = (chunk is List) ? chunk.length - _offset : 1; | 90 var listCap = (chunk is List) ? chunk.length - _offset : 1; |
99 var subsize = leftToRead > listCap ? listCap : leftToRead; | 91 var subsize = leftToRead > listCap ? listCap : leftToRead; |
100 if (chunk is List) { | 92 if (chunk is List) { |
101 ret.setRange(follower, follower + subsize, | 93 ret.setRange(follower, follower + subsize, |
102 chunk.getRange(_offset, _offset + subsize)); | 94 (chunk as List<T>).getRange(_offset, _offset + subsize)); |
103 } else { | 95 } else { |
104 ret[follower] = chunk; | 96 ret[follower] = chunk; |
105 } | 97 } |
106 follower += subsize; | 98 follower += subsize; |
107 _offset += subsize; | 99 _offset += subsize; |
108 _counter -= subsize; | 100 _counter -= subsize; |
109 leftToRead -= subsize; | 101 leftToRead -= subsize; |
110 if (chunk is! List || _offset >= chunk.length) { | 102 if (chunk is! List || _offset >= (chunk as List).length) { |
111 _offset = 0; | 103 _offset = 0; |
112 _chunks.removeAt(0); | 104 _chunks.removeAt(0); |
113 } | 105 } |
114 } | 106 } |
115 if (limited && _sub.isPaused && _counter < limit) { | 107 if (limited && _sub.isPaused && _counter < limit) { |
116 _sub.resume(); | 108 _sub.resume(); |
117 } | 109 } |
118 return ret; | 110 return ret; |
119 } | 111 } |
120 | 112 |
121 /** | 113 /// Read fully [size] bytes from the stream and return in the future. |
122 * Read fully [size] bytes from the stream and return in the future. | 114 /// |
123 * | 115 /// Throws [ArgumentError] if size is larger than optional buffer [limit]. |
124 * Throws [ArgumentError] if size is larger than optional buffer [limit]. | |
125 */ | |
126 Future<List<T>> read(int size) { | 116 Future<List<T>> read(int size) { |
127 if (limited && size > limit) { | 117 if (limited && size > limit) { |
128 throw new ArgumentError("Cannot read $size with limit $limit"); | 118 throw new ArgumentError("Cannot read $size with limit $limit"); |
129 } | 119 } |
130 | 120 |
131 // If we have enough data to consume and there are no other readers, then | 121 // If we have enough data to consume and there are no other readers, then |
132 // we can return immediately. | 122 // we can return immediately. |
133 if (size <= buffered && _readers.isEmpty) { | 123 if (size <= buffered && _readers.isEmpty) { |
134 return new Future.value(_consume(size)); | 124 return new Future.value(_consume(size)); |
135 } | 125 } |
136 Completer completer = new Completer<List<T>>(); | 126 final completer = new Completer<List<T>>(); |
137 _readers.add(new _ReaderInWaiting<List<T>>(size, completer)); | 127 _readers.add(new _ReaderInWaiting<List<T>>(size, completer)); |
138 return completer.future; | 128 return completer.future; |
139 } | 129 } |
140 | 130 |
141 @override | 131 @override |
142 Future addStream(Stream<T> stream) { | 132 Future addStream(Stream<T> stream) { |
143 var lastStream = _currentStream == null ? stream : _currentStream; | 133 var lastStream = _currentStream == null ? stream : _currentStream; |
144 if (_sub != null) { | 134 if (_sub != null) { |
145 _sub.cancel(); | 135 _sub.cancel(); |
146 _streamDone.complete(); | 136 _streamDone.complete(); |
(...skipping 10 matching lines...) Expand all Loading... |
157 while (_readers.isNotEmpty && _readers.first.size <= _counter) { | 147 while (_readers.isNotEmpty && _readers.first.size <= _counter) { |
158 var waiting = _readers.removeAt(0); | 148 var waiting = _readers.removeAt(0); |
159 waiting.completer.complete(_consume(waiting.size)); | 149 waiting.completer.complete(_consume(waiting.size)); |
160 } | 150 } |
161 }, onDone: () { | 151 }, onDone: () { |
162 // User is piping in a new stream | 152 // User is piping in a new stream |
163 if (stream == lastStream && _throwOnError) { | 153 if (stream == lastStream && _throwOnError) { |
164 _closed(new UnderflowError()); | 154 _closed(new UnderflowError()); |
165 } | 155 } |
166 streamDone.complete(); | 156 streamDone.complete(); |
167 }, onError: (e) { | 157 }, onError: (e, stack) { |
168 _closed(e); | 158 _closed(e, stack); |
169 }); | 159 }); |
170 return streamDone.future; | 160 return streamDone.future; |
171 } | 161 } |
172 | 162 |
173 _closed(e) { | 163 void _closed(e, [StackTrace stack]) { |
174 for (var reader in _readers) { | 164 for (var reader in _readers) { |
175 if (!reader.completer.isCompleted) { | 165 if (!reader.completer.isCompleted) { |
176 reader.completer.completeError(e); | 166 reader.completer.completeError(e, stack); |
177 } | 167 } |
178 } | 168 } |
179 _readers.clear(); | 169 _readers.clear(); |
180 } | 170 } |
181 | 171 |
182 Future close() { | 172 Future close() { |
183 var ret; | 173 var ret; |
184 if (_sub != null) { | 174 if (_sub != null) { |
185 ret = _sub.cancel(); | 175 ret = _sub.cancel(); |
186 _sub = null; | 176 _sub = null; |
187 } | 177 } |
188 return ret is Future ? ret : new Future.value(); | 178 return ret is Future ? ret : new Future.value(); |
189 } | 179 } |
190 } | 180 } |
191 | 181 |
192 class _ReaderInWaiting<T> { | 182 class _ReaderInWaiting<T> { |
193 int size; | 183 int size; |
194 Completer<T> completer; | 184 Completer<T> completer; |
195 _ReaderInWaiting(this.size, this.completer); | 185 _ReaderInWaiting(this.size, this.completer); |
196 } | 186 } |
OLD | NEW |