Chromium Code Reviews| OLD | NEW |
|---|---|
| 1 // Copyright (c) 2013, the Dart project authors. Please see the AUTHORS file | 1 // Copyright (c) 2013, the Dart project authors. Please see the AUTHORS file |
| 2 // for details. All rights reserved. Use of this source code is governed by a | 2 // for details. All rights reserved. Use of this source code is governed by a |
| 3 // BSD-style license that can be found in the LICENSE file. | 3 // BSD-style license that can be found in the LICENSE file. |
| 4 | 4 |
| 5 part of dart.io; | 5 part of dart.io; |
| 6 | 6 |
| 7 /** | 7 /** |
| 8 * Helper class to wrap a [StreamConsumer<List<int>>] and provide | 8 * Helper class to wrap a [StreamConsumer<List<int>>] and provide |
| 9 * utility functions for writing to the StreamConsumer directly. The | 9 * utility functions for writing to the StreamConsumer directly. The |
| 10 * [IOSink] buffers the input given by all [StringSink] methods and will delay | 10 * [IOSink] buffers the input given by all [StringSink] methods and will delay |
| 11 * an [addStream] until the buffer is flushed. | 11 * an [addStream] until the buffer is flushed. |
| 12 * | 12 * |
| 13 * When the [IOSink] is bound to a stream (through [addStream]) any call | 13 * When the [IOSink] is bound to a stream (through [addStream]) any call |
| 14 * to the [IOSink] will throw a [StateError]. When the [addStream] completes, | 14 * to the [IOSink] will throw a [StateError]. When the [addStream] completes, |
| 15 * the [IOSink] will again be open for all calls. | 15 * the [IOSink] will again be open to all calls. |
|
Lasse Reichstein Nielsen
2014/03/18 15:34:45
... will again accept all method calls.
Anders Johnsen
2014/03/19 11:48:55
Done.
| |
| 16 * | 16 * |
| 17 * If data is added to the [IOSink] after the sink is closed, the data will be | 17 * If data is added to the [IOSink] after the sink is closed, the data will be |
| 18 * ignored. Use the [done] future to be notified when the [IOSink] is closed. | 18 * ignored. Use the [done] future to be notified when the [IOSink] is closed. |
| 19 */ | 19 */ |
| 20 abstract class IOSink implements StreamSink<List<int>>, StringSink { | 20 abstract class IOSink implements StreamSink<List<int>>, StringSink { |
| 21 // TODO(ajohnsen): Make _encodingMutable an argument. | 21 // TODO(ajohnsen): Make _encodingMutable an argument. |
| 22 factory IOSink(StreamConsumer<List<int>> target, | 22 factory IOSink(StreamConsumer<List<int>> target, |
| 23 {Encoding encoding: UTF8}) | 23 {Encoding encoding: UTF8}) |
| 24 => new _IOSinkImpl(target, encoding); | 24 => new _IOSinkImpl(target, encoding); |
| 25 | 25 |
| (...skipping 91 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
| 117 Future close(); | 117 Future close(); |
| 118 | 118 |
| 119 /** | 119 /** |
| 120 * Get a future that will complete when the consumer closes, or when an | 120 * Get a future that will complete when the consumer closes, or when an |
| 121 * error occurs. This future is identical to the future returned by | 121 * error occurs. This future is identical to the future returned by |
| 122 * [close]. | 122 * [close]. |
| 123 */ | 123 */ |
| 124 Future get done; | 124 Future get done; |
| 125 } | 125 } |
| 126 | 126 |
| 127 class _StreamSinkImpl<T> implements StreamSink<T> { | |
| 128 final StreamConsumer<T> _target; | |
| 129 Completer _doneCompleter = new Completer(); | |
| 130 Future _doneFuture; | |
| 131 StreamController<T> _controllerInstance; | |
| 132 Completer _controllerCompleter; | |
| 133 bool _isClosed = false; | |
| 134 bool _isBound = false; | |
| 135 bool _hasError = false; | |
| 136 | 127 |
| 137 _StreamSinkImpl(this._target) { | 128 class _IOSinkImpl extends StreamSinkAdapter<List<int>> implements IOSink { |
| 138 _doneFuture = _doneCompleter.future; | |
| 139 } | |
| 140 | |
| 141 void add(T data) { | |
| 142 if (_isClosed) return; | |
| 143 _controller.add(data); | |
| 144 } | |
| 145 | |
| 146 void addError(error, [StackTrace stackTrace]) => | |
| 147 _controller.addError(error, stackTrace); | |
| 148 | |
| 149 Future addStream(Stream<T> stream) { | |
| 150 if (_isBound) { | |
| 151 throw new StateError("StreamSink is already bound to a stream"); | |
| 152 } | |
| 153 _isBound = true; | |
| 154 if (_hasError) return done; | |
| 155 // Wait for any sync operations to complete. | |
| 156 Future targetAddStream() { | |
| 157 return _target.addStream(stream) | |
| 158 .whenComplete(() { | |
| 159 _isBound = false; | |
| 160 }); | |
| 161 } | |
| 162 if (_controllerInstance == null) return targetAddStream(); | |
| 163 var future = _controllerCompleter.future; | |
| 164 _controllerInstance.close(); | |
| 165 return future.then((_) => targetAddStream()); | |
| 166 } | |
| 167 | |
| 168 Future flush() { | |
| 169 if (_isBound) { | |
| 170 throw new StateError("StreamSink is bound to a stream"); | |
| 171 } | |
| 172 if (_controllerInstance == null) return new Future.value(this); | |
| 173 // Adding an empty stream-controller will return a future that will complete | |
| 174 // when all data is done. | |
| 175 _isBound = true; | |
| 176 var future = _controllerCompleter.future; | |
| 177 _controllerInstance.close(); | |
| 178 return future.whenComplete(() { | |
| 179 _isBound = false; | |
| 180 }); | |
| 181 } | |
| 182 | |
| 183 Future close() { | |
| 184 if (_isBound) { | |
| 185 throw new StateError("StreamSink is bound to a stream"); | |
| 186 } | |
| 187 if (!_isClosed) { | |
| 188 _isClosed = true; | |
| 189 if (_controllerInstance != null) { | |
| 190 _controllerInstance.close(); | |
| 191 } else { | |
| 192 _closeTarget(); | |
| 193 } | |
| 194 } | |
| 195 return done; | |
| 196 } | |
| 197 | |
| 198 void _closeTarget() { | |
| 199 _target.close() | |
| 200 .then((value) => _completeDone(value: value), | |
| 201 onError: (error) => _completeDone(error: error)); | |
| 202 } | |
| 203 | |
| 204 Future get done => _doneFuture; | |
| 205 | |
| 206 void _completeDone({value, error}) { | |
| 207 if (_doneCompleter == null) return; | |
| 208 if (error == null) { | |
| 209 _doneCompleter.complete(value); | |
| 210 } else { | |
| 211 _hasError = true; | |
| 212 _doneCompleter.completeError(error); | |
| 213 } | |
| 214 _doneCompleter = null; | |
| 215 } | |
| 216 | |
| 217 StreamController<T> get _controller { | |
| 218 if (_isBound) { | |
| 219 throw new StateError("StreamSink is bound to a stream"); | |
| 220 } | |
| 221 if (_isClosed) { | |
| 222 throw new StateError("StreamSink is closed"); | |
| 223 } | |
| 224 if (_controllerInstance == null) { | |
| 225 _controllerInstance = new StreamController<T>(sync: true); | |
| 226 _controllerCompleter = new Completer(); | |
| 227 _target.addStream(_controller.stream) | |
| 228 .then( | |
| 229 (_) { | |
| 230 if (_isBound) { | |
| 231 // A new stream takes over - forward values to that stream. | |
| 232 _controllerCompleter.complete(this); | |
| 233 _controllerCompleter = null; | |
| 234 _controllerInstance = null; | |
| 235 } else { | |
| 236 // No new stream, .close was called. Close _target. | |
| 237 _closeTarget(); | |
| 238 } | |
| 239 }, | |
| 240 onError: (error) { | |
| 241 if (_isBound) { | |
| 242 // A new stream takes over - forward errors to that stream. | |
| 243 _controllerCompleter.completeError(error); | |
| 244 _controllerCompleter = null; | |
| 245 _controllerInstance = null; | |
| 246 } else { | |
| 247 // No new stream. No need to close target, as it have already | |
| 248 // failed. | |
| 249 _completeDone(error: error); | |
| 250 } | |
| 251 }); | |
| 252 } | |
| 253 return _controllerInstance; | |
| 254 } | |
| 255 } | |
| 256 | |
| 257 | |
| 258 class _IOSinkImpl extends _StreamSinkImpl<List<int>> implements IOSink { | |
| 259 Encoding _encoding; | 129 Encoding _encoding; |
| 260 bool _encodingMutable = true; | 130 bool _encodingMutable = true; |
| 261 | 131 |
| 262 _IOSinkImpl(StreamConsumer<List<int>> target, this._encoding) | 132 _IOSinkImpl(StreamConsumer<List<int>> target, this._encoding) |
| 263 : super(target); | 133 : super(target); |
| 264 | 134 |
| 265 Encoding get encoding => _encoding; | 135 Encoding get encoding => _encoding; |
| 266 | 136 |
| 267 void set encoding(Encoding value) { | 137 void set encoding(Encoding value) { |
| 268 if (!_encodingMutable) { | 138 if (!_encodingMutable) { |
| (...skipping 37 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
| 306 | 176 |
| 307 void writeln([Object obj = ""]) { | 177 void writeln([Object obj = ""]) { |
| 308 write(obj); | 178 write(obj); |
| 309 write("\n"); | 179 write("\n"); |
| 310 } | 180 } |
| 311 | 181 |
| 312 void writeCharCode(int charCode) { | 182 void writeCharCode(int charCode) { |
| 313 write(new String.fromCharCode(charCode)); | 183 write(new String.fromCharCode(charCode)); |
| 314 } | 184 } |
| 315 } | 185 } |
| OLD | NEW |