OLD | NEW |
| (Empty) |
1 // Copyright 2014 Google Inc. All Rights Reserved. | |
2 // | |
3 // Licensed under the Apache License, Version 2.0 (the "License"); | |
4 // you may not use this file except in compliance with the License. | |
5 // You may obtain a copy of the License at | |
6 // | |
7 // http://www.apache.org/licenses/LICENSE-2.0 | |
8 // | |
9 // Unless required by applicable law or agreed to in writing, software | |
10 // distributed under the License is distributed on an "AS IS" BASIS, | |
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |
12 // See the License for the specific language governing permissions and | |
13 // limitations under the License. | |
14 | |
15 part of quiver.streams; | |
16 | |
17 /** | |
18 * Underflow errors happen when the socket feeding a buffer is finished while | |
19 * there are still blocked readers. Each reader will complete with this error. | |
20 */ | |
21 class UnderflowError extends Error { | |
22 final message; | |
23 | |
24 /// The [message] describes the underflow. | |
25 UnderflowError([this.message]); | |
26 | |
27 String toString() { | |
28 if (message != null) { | |
29 return "StreamBuffer Underflow: $message"; | |
30 } | |
31 return "StreamBuffer Underflow"; | |
32 } | |
33 } | |
34 | |
35 /** | |
36 * Allow orderly reading of elements from a datastream, such as Socket, which | |
37 * might not receive List<int> bytes regular chunks. | |
38 * | |
39 * Example usage: | |
40 * StreamBuffer<int> buffer = new StreamBuffer(); | |
41 * Socket.connect('127.0.0.1', 5555).then((sock) => sock.pipe(buffer)); | |
42 * buffer.read(100).then((bytes) { | |
43 * // do something with 100 bytes; | |
44 * }); | |
45 * | |
46 * Throws [UnderflowError] if [throwOnError] is true. Useful for unexpected | |
47 * [Socket] disconnects. | |
48 */ | |
49 class StreamBuffer<T> implements StreamConsumer { | |
50 List _chunks = []; | |
51 int _offset = 0; | |
52 int _counter = 0; // sum(_chunks[*].length) - _offset | |
53 List<_ReaderInWaiting<List<T>>> _readers = []; | |
54 StreamSubscription<T> _sub; | |
55 Completer _streamDone; | |
56 | |
57 final bool _throwOnError; | |
58 | |
59 Stream _currentStream; | |
60 | |
61 int _limit = 0; | |
62 | |
63 set limit(int limit) { | |
64 _limit = limit; | |
65 if (_sub != null) { | |
66 if (!limited || _counter < limit) { | |
67 _sub.resume(); | |
68 } else { | |
69 _sub.pause(); | |
70 } | |
71 } | |
72 } | |
73 | |
74 int get limit => _limit; | |
75 | |
76 bool get limited => _limit > 0; | |
77 | |
78 /** | |
79 * Create a stream buffer with optional, soft [limit] to the amount of data | |
80 * the buffer will hold before pausing the underlying stream. A limit of 0 | |
81 * means no buffer limits. | |
82 */ | |
83 StreamBuffer({bool throwOnError: false, int limit: 0}) | |
84 : this._throwOnError = throwOnError, | |
85 this._limit = limit; | |
86 | |
87 /** | |
88 * The amount of unread data buffered. | |
89 */ | |
90 int get buffered => _counter; | |
91 | |
92 List<T> _consume(int size) { | |
93 var follower = 0; | |
94 var ret = new List(size); | |
95 var leftToRead = size; | |
96 while (leftToRead > 0) { | |
97 var chunk = _chunks.first; | |
98 var listCap = (chunk is List) ? chunk.length - _offset : 1; | |
99 var subsize = leftToRead > listCap ? listCap : leftToRead; | |
100 if (chunk is List) { | |
101 ret.setRange(follower, follower + subsize, | |
102 chunk.getRange(_offset, _offset + subsize)); | |
103 } else { | |
104 ret[follower] = chunk; | |
105 } | |
106 follower += subsize; | |
107 _offset += subsize; | |
108 _counter -= subsize; | |
109 leftToRead -= subsize; | |
110 if (chunk is! List || _offset >= chunk.length) { | |
111 _offset = 0; | |
112 _chunks.removeAt(0); | |
113 } | |
114 } | |
115 if (limited && _sub.isPaused && _counter < limit) { | |
116 _sub.resume(); | |
117 } | |
118 return ret; | |
119 } | |
120 | |
121 /** | |
122 * Read fully [size] bytes from the stream and return in the future. | |
123 * | |
124 * Throws [ArgumentError] if size is larger than optional buffer [limit]. | |
125 */ | |
126 Future<List<T>> read(int size) { | |
127 if (limited && size > limit) { | |
128 throw new ArgumentError("Cannot read $size with limit $limit"); | |
129 } | |
130 | |
131 // If we have enough data to consume and there are no other readers, then | |
132 // we can return immediately. | |
133 if (size <= buffered && _readers.isEmpty) { | |
134 return new Future.value(_consume(size)); | |
135 } | |
136 Completer completer = new Completer<List<T>>(); | |
137 _readers.add(new _ReaderInWaiting<List<T>>(size, completer)); | |
138 return completer.future; | |
139 } | |
140 | |
141 @override | |
142 Future addStream(Stream<T> stream) { | |
143 var lastStream = _currentStream == null ? stream : _currentStream; | |
144 if (_sub != null) { | |
145 _sub.cancel(); | |
146 _streamDone.complete(); | |
147 } | |
148 _currentStream = stream; | |
149 Completer streamDone = new Completer(); | |
150 _sub = stream.listen((items) { | |
151 _chunks.add(items); | |
152 _counter += items is List ? items.length : 1; | |
153 if (limited && _counter >= limit) { | |
154 _sub.pause(); | |
155 } | |
156 | |
157 while (_readers.isNotEmpty && _readers.first.size <= _counter) { | |
158 var waiting = _readers.removeAt(0); | |
159 waiting.completer.complete(_consume(waiting.size)); | |
160 } | |
161 }, onDone: () { | |
162 // User is piping in a new stream | |
163 if (stream == lastStream && _throwOnError) { | |
164 _closed(new UnderflowError()); | |
165 } | |
166 streamDone.complete(); | |
167 }, onError: (e) { | |
168 _closed(e); | |
169 }); | |
170 return streamDone.future; | |
171 } | |
172 | |
173 _closed(e) { | |
174 for (var reader in _readers) { | |
175 if (!reader.completer.isCompleted) { | |
176 reader.completer.completeError(e); | |
177 } | |
178 } | |
179 _readers.clear(); | |
180 } | |
181 | |
182 Future close() { | |
183 var ret; | |
184 if (_sub != null) { | |
185 ret = _sub.cancel(); | |
186 _sub = null; | |
187 } | |
188 return ret is Future ? ret : new Future.value(); | |
189 } | |
190 } | |
191 | |
192 class _ReaderInWaiting<T> { | |
193 int size; | |
194 Completer<T> completer; | |
195 _ReaderInWaiting(this.size, this.completer); | |
196 } | |
OLD | NEW |