OLD | NEW |
1 // Copyright (c) 2013, the Dart project authors. Please see the AUTHORS file | 1 // Copyright (c) 2013, the Dart project authors. Please see the AUTHORS file |
2 // for details. All rights reserved. Use of this source code is governed by a | 2 // for details. All rights reserved. Use of this source code is governed by a |
3 // BSD-style license that can be found in the LICENSE file. | 3 // BSD-style license that can be found in the LICENSE file. |
4 | 4 |
5 part of dart.io; | 5 part of dart.io; |
6 | 6 |
7 /** | 7 /** |
8 * Helper class to wrap a [StreamConsumer<List<int>>] and provide | 8 * Helper class to wrap a [StreamConsumer<List<int>>] and provide |
9 * utility functions for writing to the StreamConsumer directly. The | 9 * utility functions for writing to the StreamConsumer directly. The |
10 * [IOSink] buffers the input given by all [StringSink] methods and will delay | 10 * [IOSink] buffers the input given by all [StringSink] methods and will delay |
11 * an [addStream] until the buffer is flushed. | 11 * an [addStream] until the buffer is flushed. |
12 * | 12 * |
13 * When the [IOSink] is bound to a stream (through [addStream]) any call | 13 * When the [IOSink] is bound to a stream (through [addStream]) any call |
14 * to the [IOSink] will throw a [StateError]. When the [addStream] completes, | 14 * to the [IOSink] will throw a [StateError]. When the [addStream] completes, |
15 * the [IOSink] will again accept all method calls. | 15 * the [IOSink] will again be open for all calls. |
16 * | 16 * |
17 * If data is added to the [IOSink] after the sink is closed, the data will be | 17 * If data is added to the [IOSink] after the sink is closed, the data will be |
18 * ignored. Use the [done] future to be notified when the [IOSink] is closed. | 18 * ignored. Use the [done] future to be notified when the [IOSink] is closed. |
19 */ | 19 */ |
20 abstract class IOSink implements StreamSink<List<int>>, StringSink { | 20 abstract class IOSink implements StreamSink<List<int>>, StringSink { |
21 // TODO(ajohnsen): Make _encodingMutable an argument. | 21 // TODO(ajohnsen): Make _encodingMutable an argument. |
22 factory IOSink(StreamConsumer<List<int>> target, | 22 factory IOSink(StreamConsumer<List<int>> target, |
23 {Encoding encoding: UTF8}) | 23 {Encoding encoding: UTF8}) |
24 => new _IOSinkImpl(target, encoding); | 24 => new _IOSinkImpl(target, encoding); |
25 | 25 |
(...skipping 91 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
117 Future close(); | 117 Future close(); |
118 | 118 |
119 /** | 119 /** |
120 * Get a future that will complete when the consumer closes, or when an | 120 * Get a future that will complete when the consumer closes, or when an |
121 * error occurs. This future is identical to the future returned by | 121 * error occurs. This future is identical to the future returned by |
122 * [close]. | 122 * [close]. |
123 */ | 123 */ |
124 Future get done; | 124 Future get done; |
125 } | 125 } |
126 | 126 |
| 127 class _StreamSinkImpl<T> implements StreamSink<T> { |
| 128 final StreamConsumer<T> _target; |
| 129 Completer _doneCompleter = new Completer(); |
| 130 Future _doneFuture; |
| 131 StreamController<T> _controllerInstance; |
| 132 Completer _controllerCompleter; |
| 133 bool _isClosed = false; |
| 134 bool _isBound = false; |
| 135 bool _hasError = false; |
127 | 136 |
128 class _IOSinkImpl extends StreamSinkAdapter<List<int>> implements IOSink { | 137 _StreamSinkImpl(this._target) { |
| 138 _doneFuture = _doneCompleter.future; |
| 139 } |
| 140 |
| 141 void add(T data) { |
| 142 if (_isClosed) return; |
| 143 _controller.add(data); |
| 144 } |
| 145 |
| 146 void addError(error, [StackTrace stackTrace]) => |
| 147 _controller.addError(error, stackTrace); |
| 148 |
| 149 Future addStream(Stream<T> stream) { |
| 150 if (_isBound) { |
| 151 throw new StateError("StreamSink is already bound to a stream"); |
| 152 } |
| 153 _isBound = true; |
| 154 if (_hasError) return done; |
| 155 // Wait for any sync operations to complete. |
| 156 Future targetAddStream() { |
| 157 return _target.addStream(stream) |
| 158 .whenComplete(() { |
| 159 _isBound = false; |
| 160 }); |
| 161 } |
| 162 if (_controllerInstance == null) return targetAddStream(); |
| 163 var future = _controllerCompleter.future; |
| 164 _controllerInstance.close(); |
| 165 return future.then((_) => targetAddStream()); |
| 166 } |
| 167 |
| 168 Future flush() { |
| 169 if (_isBound) { |
| 170 throw new StateError("StreamSink is bound to a stream"); |
| 171 } |
| 172 if (_controllerInstance == null) return new Future.value(this); |
| 173 // Adding an empty stream-controller will return a future that will complete |
| 174 // when all data is done. |
| 175 _isBound = true; |
| 176 var future = _controllerCompleter.future; |
| 177 _controllerInstance.close(); |
| 178 return future.whenComplete(() { |
| 179 _isBound = false; |
| 180 }); |
| 181 } |
| 182 |
| 183 Future close() { |
| 184 if (_isBound) { |
| 185 throw new StateError("StreamSink is bound to a stream"); |
| 186 } |
| 187 if (!_isClosed) { |
| 188 _isClosed = true; |
| 189 if (_controllerInstance != null) { |
| 190 _controllerInstance.close(); |
| 191 } else { |
| 192 _closeTarget(); |
| 193 } |
| 194 } |
| 195 return done; |
| 196 } |
| 197 |
| 198 void _closeTarget() { |
| 199 _target.close() |
| 200 .then((value) => _completeDone(value: value), |
| 201 onError: (error) => _completeDone(error: error)); |
| 202 } |
| 203 |
| 204 Future get done => _doneFuture; |
| 205 |
| 206 void _completeDone({value, error}) { |
| 207 if (_doneCompleter == null) return; |
| 208 if (error == null) { |
| 209 _doneCompleter.complete(value); |
| 210 } else { |
| 211 _hasError = true; |
| 212 _doneCompleter.completeError(error); |
| 213 } |
| 214 _doneCompleter = null; |
| 215 } |
| 216 |
| 217 StreamController<T> get _controller { |
| 218 if (_isBound) { |
| 219 throw new StateError("StreamSink is bound to a stream"); |
| 220 } |
| 221 if (_isClosed) { |
| 222 throw new StateError("StreamSink is closed"); |
| 223 } |
| 224 if (_controllerInstance == null) { |
| 225 _controllerInstance = new StreamController<T>(sync: true); |
| 226 _controllerCompleter = new Completer(); |
| 227 _target.addStream(_controller.stream) |
| 228 .then( |
| 229 (_) { |
| 230 if (_isBound) { |
| 231 // A new stream takes over - forward values to that stream. |
| 232 _controllerCompleter.complete(this); |
| 233 _controllerCompleter = null; |
| 234 _controllerInstance = null; |
| 235 } else { |
| 236 // No new stream, .close was called. Close _target. |
| 237 _closeTarget(); |
| 238 } |
| 239 }, |
| 240 onError: (error) { |
| 241 if (_isBound) { |
| 242 // A new stream takes over - forward errors to that stream. |
| 243 _controllerCompleter.completeError(error); |
| 244 _controllerCompleter = null; |
| 245 _controllerInstance = null; |
| 246 } else { |
| 247 // No new stream. No need to close target, as it have already |
| 248 // failed. |
| 249 _completeDone(error: error); |
| 250 } |
| 251 }); |
| 252 } |
| 253 return _controllerInstance; |
| 254 } |
| 255 } |
| 256 |
| 257 |
| 258 class _IOSinkImpl extends _StreamSinkImpl<List<int>> implements IOSink { |
129 Encoding _encoding; | 259 Encoding _encoding; |
130 bool _encodingMutable = true; | 260 bool _encodingMutable = true; |
131 | 261 |
132 _IOSinkImpl(StreamConsumer<List<int>> target, this._encoding) | 262 _IOSinkImpl(StreamConsumer<List<int>> target, this._encoding) |
133 : super(target); | 263 : super(target); |
134 | 264 |
135 Encoding get encoding => _encoding; | 265 Encoding get encoding => _encoding; |
136 | 266 |
137 void set encoding(Encoding value) { | 267 void set encoding(Encoding value) { |
138 if (!_encodingMutable) { | 268 if (!_encodingMutable) { |
(...skipping 37 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
176 | 306 |
177 void writeln([Object obj = ""]) { | 307 void writeln([Object obj = ""]) { |
178 write(obj); | 308 write(obj); |
179 write("\n"); | 309 write("\n"); |
180 } | 310 } |
181 | 311 |
182 void writeCharCode(int charCode) { | 312 void writeCharCode(int charCode) { |
183 write(new String.fromCharCode(charCode)); | 313 write(new String.fromCharCode(charCode)); |
184 } | 314 } |
185 } | 315 } |
OLD | NEW |