Chromium Code Reviews| OLD | NEW |
|---|---|
| 1 // Copyright (c) 2013, the Dart project authors. Please see the AUTHORS file | 1 // Copyright (c) 2013, the Dart project authors. Please see the AUTHORS file |
| 2 // for details. All rights reserved. Use of this source code is governed by a | 2 // for details. All rights reserved. Use of this source code is governed by a |
| 3 // BSD-style license that can be found in the LICENSE file. | 3 // BSD-style license that can be found in the LICENSE file. |
| 4 | 4 |
| 5 part of dart.io; | 5 part of dart.io; |
| 6 | 6 |
| 7 /** | 7 /** |
| 8 * Helper class to wrap a [StreamConsumer<List<int>>] and provide | 8 * Helper class to wrap a [StreamConsumer<List<int>>] and provide |
| 9 * utility functions for writing to the StreamConsumer directly. The | 9 * utility functions for writing to the StreamConsumer directly. The |
| 10 * [IOSink] buffers the input given by [write], [writeAll], [writeln], | 10 * [IOSink] buffers the input given by [write], [writeAll], [writeln], |
|
Søren Gjesse
2013/04/15 06:56:30
[write], [writeAll], [writeln], [writeCharCode] =>
Anders Johnsen
2013/04/15 07:35:20
Done.
| |
| 11 * [writeCharCode] and [add] and will delay a [consume] or | 11 * [writeCharCode] and [add] and will delay a [addStream] until |
| 12 * [writeStream] until the buffer is flushed. | 12 * the buffer is flushed. |
| 13 * | 13 * |
| 14 * When the [IOSink] is bound to a stream (through either [consume] | 14 * When the [IOSink] is bound to a stream (through [addStream]) any call |
| 15 * or [writeStream]) any call to the [IOSink] will throw a | 15 * to the [IOSink] will throw a [StateError]. |
|
Søren Gjesse
2013/04/15 06:56:30
Maybe mention that when the addStream future compl
Anders Johnsen
2013/04/15 07:35:20
Done.
| |
| 16 * [StateError]. | |
| 17 */ | 16 */ |
| 18 abstract class IOSink<T> | 17 abstract class IOSink |
| 19 implements StreamConsumer<List<int>>, StringSink, EventSink<List<int>> { | 18 implements StreamConsumer<List<int>>, StringSink, EventSink<List<int>> { |
| 20 factory IOSink(StreamConsumer<List<int>> target, | 19 factory IOSink(StreamConsumer<List<int>> target, |
| 21 {Encoding encoding: Encoding.UTF_8}) | 20 {Encoding encoding: Encoding.UTF_8}) |
| 22 => new _IOSinkImpl(target, encoding); | 21 => new _IOSinkImpl(target, encoding); |
| 23 | 22 |
| 24 /** | 23 /** |
| 25 * The [Encoding] used when writing strings. Depending on the | 24 * The [Encoding] used when writing strings. Depending on the |
| 26 * underlying consumer this property might be mutable. | 25 * underlying consumer this property might be mutable. |
| 27 */ | 26 */ |
| 28 Encoding encoding; | 27 Encoding encoding; |
| 29 | 28 |
| 30 /** | 29 /** |
| 31 * Writes the bytes uninterpreted to the consumer. | 30 * Writes the bytes uninterpreted to the consumer. |
| 32 */ | 31 */ |
| 33 void add(List<int> data); | 32 void add(List<int> data); |
| 34 | 33 |
| 35 /** | 34 /** |
| 36 * Writes an error to the consumer. | 35 * Writes an error to the consumer. |
| 37 */ | 36 */ |
| 38 void addError(AsyncError error); | 37 void addError(AsyncError error); |
| 39 | 38 |
| 40 /** | 39 /** |
| 41 * Provide functionality for piping to the [IOSink]. | |
| 42 */ | |
| 43 Future<T> consume(Stream<List<int>> stream); | |
| 44 | |
| 45 /** | |
| 46 * Adds all elements of the given [stream] to `this`. | 40 * Adds all elements of the given [stream] to `this`. |
| 47 */ | 41 */ |
| 48 Future<T> addStream(Stream<List<int>> stream); | 42 Future addStream(Stream<List<int>> stream); |
| 49 | |
| 50 /** | |
| 51 * Like [consume], but will not close the target when done. | |
| 52 * | |
| 53 * *Deprecated*: use [addStream] instead. | |
| 54 */ | |
| 55 Future<T> writeStream(Stream<List<int>> stream); | |
| 56 | 43 |
| 57 /** | 44 /** |
| 58 * Close the target. | 45 * Close the target. |
| 59 */ | 46 */ |
| 60 // TODO(floitsch): Currently the future cannot be typed because it has | |
| 61 // hardcoded type Future<HttpClientResponse> in subclass HttpClientRequest. | |
| 62 Future close(); | 47 Future close(); |
| 63 | 48 |
| 64 /** | 49 /** |
| 65 * Get future that will complete when all data has been written to | 50 * Get a future that will complete when all synchronous have completed, or an |
| 66 * the IOSink and it has been closed. | 51 * error happened. This future is identical to the future returned from close. |
| 67 */ | 52 */ |
| 68 Future<T> get done; | 53 Future get done; |
| 69 } | 54 } |
| 70 | 55 |
| 71 | 56 |
| 72 class _IOSinkImpl<T> implements IOSink<T> { | 57 class _IOSinkImpl implements IOSink { |
| 73 final StreamConsumer<List<int>> _target; | 58 final StreamConsumer<List<int>> _target; |
| 74 | 59 Completer _doneCompleter = new Completer(); |
| 75 Completer _writeStreamCompleter; | 60 Future _doneFuture; |
| 76 StreamController<List<int>> _controllerInstance; | 61 StreamController<List<int>> _controllerInstance; |
| 77 Future<T> _pipeFuture; | 62 Completer _controllerCompleter; |
| 78 StreamSubscription<List<int>> _bindSubscription; | 63 Encoding _encoding; |
| 79 bool _paused = true; | 64 bool _isClosed = false; |
| 65 bool _isBound = false; | |
| 80 bool _encodingMutable = true; | 66 bool _encodingMutable = true; |
| 81 | 67 |
| 82 _IOSinkImpl(StreamConsumer<List<int>> this._target, this._encoding); | 68 _IOSinkImpl(StreamConsumer<List<int>> this._target, this._encoding) { |
| 83 | 69 _doneFuture = _doneCompleter.future; |
| 84 Encoding _encoding; | 70 } |
| 85 | 71 |
| 86 Encoding get encoding => _encoding; | 72 Encoding get encoding => _encoding; |
| 87 | 73 |
| 88 void set encoding(Encoding value) { | 74 void set encoding(Encoding value) { |
| 89 if (!_encodingMutable) { | 75 if (!_encodingMutable) { |
| 90 throw new StateError("IOSink encoding is not mutable"); | 76 throw new StateError("IOSink encoding is not mutable"); |
| 91 } | 77 } |
| 92 _encoding = value; | 78 _encoding = value; |
| 93 } | 79 } |
| 94 | 80 |
| (...skipping 33 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
| 128 void writeln([Object obj = ""]) { | 114 void writeln([Object obj = ""]) { |
| 129 write(obj); | 115 write(obj); |
| 130 write("\n"); | 116 write("\n"); |
| 131 } | 117 } |
| 132 | 118 |
| 133 void writeCharCode(int charCode) { | 119 void writeCharCode(int charCode) { |
| 134 write(new String.fromCharCode(charCode)); | 120 write(new String.fromCharCode(charCode)); |
| 135 } | 121 } |
| 136 | 122 |
| 137 void add(List<int> data) { | 123 void add(List<int> data) { |
| 138 if (_isBound) { | |
| 139 throw new StateError("IOSink is already bound to a stream"); | |
| 140 } | |
| 141 _controller.add(data); | 124 _controller.add(data); |
| 142 } | 125 } |
| 143 | 126 |
| 144 void addError(AsyncError error) { | 127 void addError(AsyncError error) { |
| 128 _controller.addError(error); | |
| 129 } | |
| 130 | |
| 131 Future addStream(Stream<List<int>> stream) { | |
| 145 if (_isBound) { | 132 if (_isBound) { |
| 146 throw new StateError("IOSink is already bound to a stream"); | 133 throw new StateError("IOSink is already bound to a stream"); |
| 147 } | 134 } |
| 148 _controller.addError(error); | 135 _isBound = true; |
|
Søren Gjesse
2013/04/15 06:56:30
This looks good. Maybe add a comment like "wait fo
Anders Johnsen
2013/04/15 07:35:20
Done.
| |
| 136 return _closeController().then((_) { | |
| 137 return _target.addStream(stream) | |
| 138 .whenComplete(() { | |
| 139 _isBound = false; | |
| 140 }); | |
| 141 }); | |
| 149 } | 142 } |
| 150 | 143 |
| 151 Future<T> consume(Stream<List<int>> stream) { | 144 Future _closeController() { |
| 152 if (_isBound) { | 145 if (_controllerInstance == null) return new Future.immediate(null); |
| 153 throw new StateError("IOSink is already bound to a stream"); | 146 var future = _controllerCompleter.future; |
| 154 } | 147 _controllerInstance.close(); |
| 155 return _fillFromStream(stream); | 148 return future; |
| 156 } | |
| 157 | |
| 158 Future<T> writeStream(Stream<List<int>> stream) { | |
| 159 return addStream(stream); | |
| 160 } | |
| 161 | |
| 162 Future<T> addStream(Stream<List<int>> stream) { | |
| 163 if (_isBound) { | |
| 164 throw new StateError("IOSink is already bound to a stream"); | |
| 165 } | |
| 166 return _fillFromStream(stream, unbind: true); | |
| 167 } | 149 } |
| 168 | 150 |
| 169 Future close() { | 151 Future close() { |
| 170 if (_isBound) { | 152 if (_isBound) { |
| 171 throw new StateError("IOSink is already bound to a stream"); | 153 throw new StateError("IOSink is already bound to a stream"); |
|
Søren Gjesse
2013/04/15 06:56:30
Maybe the "already" in this message is confusing.
Anders Johnsen
2013/04/15 07:35:20
Done.
| |
| 172 } | 154 } |
| 173 _controller.close(); | 155 if (!_isClosed) { |
| 174 return _pipeFuture; | 156 _isClosed = true; |
| 157 if (_controllerInstance != null) { | |
| 158 _controllerInstance.close(); | |
| 159 } else { | |
| 160 _closeTarget(); | |
| 161 } | |
| 162 } | |
| 163 return done; | |
| 175 } | 164 } |
| 176 | 165 |
| 177 Future<T> get done { | 166 void _closeTarget() { |
| 178 _controller; | 167 _target.close() |
| 179 return _pipeFuture; | 168 .then((_) => _completeDone(), |
| 169 onError: (error) => _completeDone(error)); | |
| 180 } | 170 } |
| 181 | 171 |
| 182 void _completeWriteStreamCompleter([error]) { | 172 Future get done => _doneFuture; |
| 183 if (_writeStreamCompleter == null) return; | 173 |
| 184 var tmp = _writeStreamCompleter; | 174 void _completeDone([error]) { |
| 185 _writeStreamCompleter = null; | 175 if (_doneCompleter == null) return; |
| 176 var tmp = _doneCompleter; | |
| 177 _doneCompleter = null; | |
| 186 if (error == null) { | 178 if (error == null) { |
| 187 _bindSubscription = null; | |
| 188 tmp.complete(); | 179 tmp.complete(); |
| 189 } else { | 180 } else { |
| 190 tmp.completeError(error); | 181 tmp.completeError(error); |
| 191 } | 182 } |
| 192 } | 183 } |
| 193 | 184 |
| 194 StreamController<List<int>> get _controller { | 185 StreamController<List<int>> get _controller { |
| 186 if (_isBound) { | |
| 187 throw new StateError("IOSink is already bound to a stream"); | |
|
Søren Gjesse
2013/04/15 06:56:30
Remove "already" here as well.
Anders Johnsen
2013/04/15 07:35:20
Done.
| |
| 188 } | |
| 189 if (_isClosed) { | |
| 190 throw new StateError("IOSink is already closed"); | |
|
Søren Gjesse
2013/04/15 06:56:30
And here.
Anders Johnsen
2013/04/15 07:35:20
Done.
| |
| 191 } | |
| 195 if (_controllerInstance == null) { | 192 if (_controllerInstance == null) { |
|
Søren Gjesse
2013/04/15 06:56:30
This also looks nice!
Anders Johnsen
2013/04/15 07:35:20
Thanks :)
| |
| 196 _controllerInstance = new StreamController<List<int>>( | 193 _controllerInstance = new StreamController<List<int>>(); |
| 197 onPauseStateChange: _onPauseStateChange, | 194 _controllerCompleter = new Completer(); |
| 198 onSubscriptionStateChange: _onSubscriptionStateChange); | 195 _target.addStream(_controller.stream) |
| 199 var future = _controller.stream.pipe(_target); | 196 .then( |
| 200 future.then((_) => _completeWriteStreamCompleter(), | 197 (_) { |
| 201 onError: (error) => _completeWriteStreamCompleter(error)); | 198 if (_isBound) { |
| 202 _pipeFuture = future.then((value) => value); | 199 // A new stream takes over - forward errors to that stream. |
|
Søren Gjesse
2013/04/15 06:56:30
I think this comment belongs in the onError case.
Anders Johnsen
2013/04/15 07:35:20
Done.
| |
| 200 var completer = _controllerCompleter; | |
| 201 _controllerCompleter = null; | |
| 202 _controllerInstance = null; | |
| 203 completer.complete(); | |
| 204 } else { | |
| 205 // No new stream, .close was called. Close _target. | |
| 206 _closeTarget(); | |
| 207 } | |
| 208 }, | |
| 209 onError: (error) { | |
| 210 if (_isBound) { | |
| 211 var completer = _controllerCompleter; | |
| 212 _controllerCompleter = null; | |
| 213 _controllerInstance = null; | |
| 214 completer.completeError(error); | |
| 215 } else { | |
| 216 _completeDone(error); | |
| 217 } | |
| 218 }); | |
| 203 } | 219 } |
| 204 return _controllerInstance; | 220 return _controllerInstance; |
| 205 } | 221 } |
| 206 | |
| 207 bool get _isBound => _bindSubscription != null; | |
| 208 | |
| 209 void _onPauseStateChange() { | |
| 210 _paused = _controller.isPaused; | |
| 211 if (_controller.isPaused) { | |
| 212 _pause(); | |
| 213 } else { | |
| 214 _resume(); | |
| 215 } | |
| 216 } | |
| 217 | |
| 218 void _pause() { | |
| 219 if (_bindSubscription != null) { | |
| 220 try { | |
| 221 // The subscription can be canceled at this point. | |
| 222 _bindSubscription.pause(); | |
| 223 } catch (e) { | |
| 224 } | |
| 225 } | |
| 226 } | |
| 227 | |
| 228 void _resume() { | |
| 229 if (_bindSubscription != null) { | |
| 230 try { | |
| 231 // The subscription can be canceled at this point. | |
| 232 _bindSubscription.resume(); | |
| 233 } catch (e) { | |
| 234 } | |
| 235 } | |
| 236 } | |
| 237 | |
| 238 void _onSubscriptionStateChange() { | |
| 239 if (_controller.hasListener) { | |
| 240 _paused = false; | |
| 241 _resume(); | |
| 242 } else { | |
| 243 if (_bindSubscription != null) { | |
| 244 _bindSubscription.cancel(); | |
| 245 _bindSubscription = null; | |
| 246 } | |
| 247 } | |
| 248 } | |
| 249 | |
| 250 Future<T> _fillFromStream(Stream<List<int>> stream, {unbind: false}) { | |
| 251 _controller; | |
| 252 assert(_writeStreamCompleter == null); | |
| 253 if (unbind) { | |
| 254 _writeStreamCompleter = new Completer<T>(); | |
| 255 } | |
| 256 _bindSubscription = stream.listen( | |
| 257 _controller.add, | |
| 258 onDone: () { | |
| 259 if (unbind) { | |
| 260 _completeWriteStreamCompleter(); | |
| 261 } else { | |
| 262 _controller.close(); | |
| 263 } | |
| 264 }, | |
| 265 onError: _controller.addError); | |
| 266 if (_paused) _pause(); | |
| 267 if (unbind) { | |
| 268 return _writeStreamCompleter.future; | |
| 269 } else { | |
| 270 return _pipeFuture; | |
| 271 } | |
| 272 } | |
| 273 } | 222 } |
| OLD | NEW |