OLD | NEW |
1 // Copyright (c) 2013, the Dart project authors. Please see the AUTHORS file | 1 // Copyright (c) 2013, the Dart project authors. Please see the AUTHORS file |
2 // for details. All rights reserved. Use of this source code is governed by a | 2 // for details. All rights reserved. Use of this source code is governed by a |
3 // BSD-style license that can be found in the LICENSE file. | 3 // BSD-style license that can be found in the LICENSE file. |
4 | 4 |
5 part of dart.io; | 5 part of dart.io; |
6 | 6 |
7 /** | 7 /** |
8 * Helper class to wrap a [StreamConsumer<List<int>>] and provide | 8 * Helper class to wrap a [StreamConsumer<List<int>>] and provide |
9 * utility functions for writing to the StreamConsumer directly. The | 9 * utility functions for writing to the StreamConsumer directly. The |
10 * [IOSink] buffers the input given by all [StringSink] methods and will delay | 10 * [IOSink] buffers the input given by all [StringSink] methods and will delay |
11 * an [addStream] until the buffer is flushed. | 11 * an [addStream] until the buffer is flushed. |
12 * | 12 * |
13 * When the [IOSink] is bound to a stream (through [addStream]) any call | 13 * When the [IOSink] is bound to a stream (through [addStream]) any call |
14 * to the [IOSink] will throw a [StateError]. When the [addStream] completes, | 14 * to the [IOSink] will throw a [StateError]. When the [addStream] completes, |
15 * the [IOSink] will again be open for all calls. | 15 * the [IOSink] will again accept all method calls. |
16 * | 16 * |
17 * If data is added to the [IOSink] after the sink is closed, the data will be | 17 * If data is added to the [IOSink] after the sink is closed, the data will be |
18 * ignored. Use the [done] future to be notified when the [IOSink] is closed. | 18 * ignored. Use the [done] future to be notified when the [IOSink] is closed. |
19 */ | 19 */ |
20 abstract class IOSink implements StreamSink<List<int>>, StringSink { | 20 abstract class IOSink implements StreamSink<List<int>>, StringSink { |
21 // TODO(ajohnsen): Make _encodingMutable an argument. | 21 // TODO(ajohnsen): Make _encodingMutable an argument. |
22 factory IOSink(StreamConsumer<List<int>> target, | 22 factory IOSink(StreamConsumer<List<int>> target, |
23 {Encoding encoding: UTF8}) | 23 {Encoding encoding: UTF8}) |
24 => new _IOSinkImpl(target, encoding); | 24 => new _IOSinkImpl(target, encoding); |
25 | 25 |
(...skipping 91 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
117 Future close(); | 117 Future close(); |
118 | 118 |
119 /** | 119 /** |
120 * Get a future that will complete when the consumer closes, or when an | 120 * Get a future that will complete when the consumer closes, or when an |
121 * error occurs. This future is identical to the future returned by | 121 * error occurs. This future is identical to the future returned by |
122 * [close]. | 122 * [close]. |
123 */ | 123 */ |
124 Future get done; | 124 Future get done; |
125 } | 125 } |
126 | 126 |
127 class _StreamSinkImpl<T> implements StreamSink<T> { | |
128 final StreamConsumer<T> _target; | |
129 Completer _doneCompleter = new Completer(); | |
130 Future _doneFuture; | |
131 StreamController<T> _controllerInstance; | |
132 Completer _controllerCompleter; | |
133 bool _isClosed = false; | |
134 bool _isBound = false; | |
135 bool _hasError = false; | |
136 | 127 |
137 _StreamSinkImpl(this._target) { | 128 class _IOSinkImpl extends StreamSinkAdapter<List<int>> implements IOSink { |
138 _doneFuture = _doneCompleter.future; | |
139 } | |
140 | |
141 void add(T data) { | |
142 if (_isClosed) return; | |
143 _controller.add(data); | |
144 } | |
145 | |
146 void addError(error, [StackTrace stackTrace]) => | |
147 _controller.addError(error, stackTrace); | |
148 | |
149 Future addStream(Stream<T> stream) { | |
150 if (_isBound) { | |
151 throw new StateError("StreamSink is already bound to a stream"); | |
152 } | |
153 _isBound = true; | |
154 if (_hasError) return done; | |
155 // Wait for any sync operations to complete. | |
156 Future targetAddStream() { | |
157 return _target.addStream(stream) | |
158 .whenComplete(() { | |
159 _isBound = false; | |
160 }); | |
161 } | |
162 if (_controllerInstance == null) return targetAddStream(); | |
163 var future = _controllerCompleter.future; | |
164 _controllerInstance.close(); | |
165 return future.then((_) => targetAddStream()); | |
166 } | |
167 | |
168 Future flush() { | |
169 if (_isBound) { | |
170 throw new StateError("StreamSink is bound to a stream"); | |
171 } | |
172 if (_controllerInstance == null) return new Future.value(this); | |
173 // Adding an empty stream-controller will return a future that will complete | |
174 // when all data is done. | |
175 _isBound = true; | |
176 var future = _controllerCompleter.future; | |
177 _controllerInstance.close(); | |
178 return future.whenComplete(() { | |
179 _isBound = false; | |
180 }); | |
181 } | |
182 | |
183 Future close() { | |
184 if (_isBound) { | |
185 throw new StateError("StreamSink is bound to a stream"); | |
186 } | |
187 if (!_isClosed) { | |
188 _isClosed = true; | |
189 if (_controllerInstance != null) { | |
190 _controllerInstance.close(); | |
191 } else { | |
192 _closeTarget(); | |
193 } | |
194 } | |
195 return done; | |
196 } | |
197 | |
198 void _closeTarget() { | |
199 _target.close() | |
200 .then((value) => _completeDone(value: value), | |
201 onError: (error) => _completeDone(error: error)); | |
202 } | |
203 | |
204 Future get done => _doneFuture; | |
205 | |
206 void _completeDone({value, error}) { | |
207 if (_doneCompleter == null) return; | |
208 if (error == null) { | |
209 _doneCompleter.complete(value); | |
210 } else { | |
211 _hasError = true; | |
212 _doneCompleter.completeError(error); | |
213 } | |
214 _doneCompleter = null; | |
215 } | |
216 | |
217 StreamController<T> get _controller { | |
218 if (_isBound) { | |
219 throw new StateError("StreamSink is bound to a stream"); | |
220 } | |
221 if (_isClosed) { | |
222 throw new StateError("StreamSink is closed"); | |
223 } | |
224 if (_controllerInstance == null) { | |
225 _controllerInstance = new StreamController<T>(sync: true); | |
226 _controllerCompleter = new Completer(); | |
227 _target.addStream(_controller.stream) | |
228 .then( | |
229 (_) { | |
230 if (_isBound) { | |
231 // A new stream takes over - forward values to that stream. | |
232 _controllerCompleter.complete(this); | |
233 _controllerCompleter = null; | |
234 _controllerInstance = null; | |
235 } else { | |
236 // No new stream, .close was called. Close _target. | |
237 _closeTarget(); | |
238 } | |
239 }, | |
240 onError: (error) { | |
241 if (_isBound) { | |
242 // A new stream takes over - forward errors to that stream. | |
243 _controllerCompleter.completeError(error); | |
244 _controllerCompleter = null; | |
245 _controllerInstance = null; | |
246 } else { | |
247 // No new stream. No need to close target, as it have already | |
248 // failed. | |
249 _completeDone(error: error); | |
250 } | |
251 }); | |
252 } | |
253 return _controllerInstance; | |
254 } | |
255 } | |
256 | |
257 | |
258 class _IOSinkImpl extends _StreamSinkImpl<List<int>> implements IOSink { | |
259 Encoding _encoding; | 129 Encoding _encoding; |
260 bool _encodingMutable = true; | 130 bool _encodingMutable = true; |
261 | 131 |
262 _IOSinkImpl(StreamConsumer<List<int>> target, this._encoding) | 132 _IOSinkImpl(StreamConsumer<List<int>> target, this._encoding) |
263 : super(target); | 133 : super(target); |
264 | 134 |
265 Encoding get encoding => _encoding; | 135 Encoding get encoding => _encoding; |
266 | 136 |
267 void set encoding(Encoding value) { | 137 void set encoding(Encoding value) { |
268 if (!_encodingMutable) { | 138 if (!_encodingMutable) { |
(...skipping 37 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
306 | 176 |
307 void writeln([Object obj = ""]) { | 177 void writeln([Object obj = ""]) { |
308 write(obj); | 178 write(obj); |
309 write("\n"); | 179 write("\n"); |
310 } | 180 } |
311 | 181 |
312 void writeCharCode(int charCode) { | 182 void writeCharCode(int charCode) { |
313 write(new String.fromCharCode(charCode)); | 183 write(new String.fromCharCode(charCode)); |
314 } | 184 } |
315 } | 185 } |
OLD | NEW |