OLD | NEW |
1 // Copyright (c) 2013, the Dart project authors. Please see the AUTHORS file | 1 // Copyright (c) 2013, the Dart project authors. Please see the AUTHORS file |
2 // for details. All rights reserved. Use of this source code is governed by a | 2 // for details. All rights reserved. Use of this source code is governed by a |
3 // BSD-style license that can be found in the LICENSE file. | 3 // BSD-style license that can be found in the LICENSE file. |
4 | 4 |
5 part of dart.io; | 5 part of dart.io; |
6 | 6 |
7 /** | 7 /** |
8 * Helper class to wrap a [StreamConsumer<List<int>>] and provide | 8 * Helper class to wrap a [StreamConsumer<List<int>>] and provide |
9 * utility functions for writing to the StreamConsumer directly. The | 9 * utility functions for writing to the StreamConsumer directly. The |
10 * [IOSink] buffers the input given by all [StringSink] methods and will delay | 10 * [IOSink] buffers the input given by all [StringSink] methods and will delay |
(...skipping 125 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
136 | 136 |
137 _StreamSinkImpl(this._target) { | 137 _StreamSinkImpl(this._target) { |
138 _doneFuture = _doneCompleter.future; | 138 _doneFuture = _doneCompleter.future; |
139 } | 139 } |
140 | 140 |
141 void add(T data) { | 141 void add(T data) { |
142 if (_isClosed) return; | 142 if (_isClosed) return; |
143 _controller.add(data); | 143 _controller.add(data); |
144 } | 144 } |
145 | 145 |
146 void addError(error, [StackTrace stackTrace]) => | 146 void addError(error, [StackTrace stackTrace]) { |
147 _controller.addError(error, stackTrace); | 147 _controller.addError(error, stackTrace); |
| 148 } |
148 | 149 |
149 Future addStream(Stream<T> stream) { | 150 Future addStream(Stream<T> stream) { |
150 if (_isBound) { | 151 if (_isBound) { |
151 throw new StateError("StreamSink is already bound to a stream"); | 152 throw new StateError("StreamSink is already bound to a stream"); |
152 } | 153 } |
153 _isBound = true; | 154 _isBound = true; |
154 if (_hasError) return done; | 155 if (_hasError) return done; |
155 // Wait for any sync operations to complete. | 156 // Wait for any sync operations to complete. |
156 Future targetAddStream() { | 157 Future targetAddStream() { |
157 return _target.addStream(stream) | 158 return _target.addStream(stream) |
(...skipping 31 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
189 if (_controllerInstance != null) { | 190 if (_controllerInstance != null) { |
190 _controllerInstance.close(); | 191 _controllerInstance.close(); |
191 } else { | 192 } else { |
192 _closeTarget(); | 193 _closeTarget(); |
193 } | 194 } |
194 } | 195 } |
195 return done; | 196 return done; |
196 } | 197 } |
197 | 198 |
198 void _closeTarget() { | 199 void _closeTarget() { |
199 _target.close() | 200 _target.close().then(_completeDoneValue, onError: _completeDoneError); |
200 .then((value) => _completeDone(value: value), | |
201 onError: (error) => _completeDone(error: error)); | |
202 } | 201 } |
203 | 202 |
204 Future get done => _doneFuture; | 203 Future get done => _doneFuture; |
205 | 204 |
206 void _completeDone({value, error}) { | 205 void _completeDoneValue(value) { |
207 if (_doneCompleter == null) return; | 206 if (_doneCompleter == null) return; |
208 if (error == null) { | 207 _doneCompleter.complete(value); |
209 _doneCompleter.complete(value); | |
210 } else { | |
211 _hasError = true; | |
212 _doneCompleter.completeError(error); | |
213 } | |
214 _doneCompleter = null; | 208 _doneCompleter = null; |
215 } | 209 } |
216 | 210 |
| 211 void _completeDoneError(error, StackTrace stackTrace) { |
| 212 if (_doneCompleter == null) return; |
| 213 _hasError = true; |
| 214 _doneCompleter.completeError(error, stackTrace); |
| 215 _doneCompleter = null; |
| 216 } |
| 217 |
217 StreamController<T> get _controller { | 218 StreamController<T> get _controller { |
218 if (_isBound) { | 219 if (_isBound) { |
219 throw new StateError("StreamSink is bound to a stream"); | 220 throw new StateError("StreamSink is bound to a stream"); |
220 } | 221 } |
221 if (_isClosed) { | 222 if (_isClosed) { |
222 throw new StateError("StreamSink is closed"); | 223 throw new StateError("StreamSink is closed"); |
223 } | 224 } |
224 if (_controllerInstance == null) { | 225 if (_controllerInstance == null) { |
225 _controllerInstance = new StreamController<T>(sync: true); | 226 _controllerInstance = new StreamController<T>(sync: true); |
226 _controllerCompleter = new Completer(); | 227 _controllerCompleter = new Completer(); |
227 _target.addStream(_controller.stream) | 228 _target.addStream(_controller.stream) |
228 .then( | 229 .then( |
229 (_) { | 230 (_) { |
230 if (_isBound) { | 231 if (_isBound) { |
231 // A new stream takes over - forward values to that stream. | 232 // A new stream takes over - forward values to that stream. |
232 _controllerCompleter.complete(this); | 233 _controllerCompleter.complete(this); |
233 _controllerCompleter = null; | 234 _controllerCompleter = null; |
234 _controllerInstance = null; | 235 _controllerInstance = null; |
235 } else { | 236 } else { |
236 // No new stream, .close was called. Close _target. | 237 // No new stream, .close was called. Close _target. |
237 _closeTarget(); | 238 _closeTarget(); |
238 } | 239 } |
239 }, | 240 }, |
240 onError: (error) { | 241 onError: (error, stackTrace) { |
241 if (_isBound) { | 242 if (_isBound) { |
242 // A new stream takes over - forward errors to that stream. | 243 // A new stream takes over - forward errors to that stream. |
243 _controllerCompleter.completeError(error); | 244 _controllerCompleter.completeError(error, stackTrace); |
244 _controllerCompleter = null; | 245 _controllerCompleter = null; |
245 _controllerInstance = null; | 246 _controllerInstance = null; |
246 } else { | 247 } else { |
247 // No new stream. No need to close target, as it have already | 248 // No new stream. No need to close target, as it have already |
248 // failed. | 249 // failed. |
249 _completeDone(error: error); | 250 _completeDoneError(error, stackTrace); |
250 } | 251 } |
251 }); | 252 }); |
252 } | 253 } |
253 return _controllerInstance; | 254 return _controllerInstance; |
254 } | 255 } |
255 } | 256 } |
256 | 257 |
257 | 258 |
258 class _IOSinkImpl extends _StreamSinkImpl<List<int>> implements IOSink { | 259 class _IOSinkImpl extends _StreamSinkImpl<List<int>> implements IOSink { |
259 Encoding _encoding; | 260 Encoding _encoding; |
(...skipping 46 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
306 | 307 |
307 void writeln([Object obj = ""]) { | 308 void writeln([Object obj = ""]) { |
308 write(obj); | 309 write(obj); |
309 write("\n"); | 310 write("\n"); |
310 } | 311 } |
311 | 312 |
312 void writeCharCode(int charCode) { | 313 void writeCharCode(int charCode) { |
313 write(new String.fromCharCode(charCode)); | 314 write(new String.fromCharCode(charCode)); |
314 } | 315 } |
315 } | 316 } |
OLD | NEW |