OLD | NEW |
---|---|
1 // Copyright (c) 2013, the Dart project authors. Please see the AUTHORS file | 1 // Copyright (c) 2013, the Dart project authors. Please see the AUTHORS file |
2 // for details. All rights reserved. Use of this source code is governed by a | 2 // for details. All rights reserved. Use of this source code is governed by a |
3 // BSD-style license that can be found in the LICENSE file. | 3 // BSD-style license that can be found in the LICENSE file. |
4 | 4 |
5 part of dart.io; | 5 part of dart.io; |
6 | 6 |
7 /** | 7 /** |
8 * Helper class to wrap a [StreamConsumer<List<int>>] and provide | 8 * Helper class to wrap a [StreamConsumer<List<int>>] and provide |
9 * utility functions for writing to the StreamConsumer directly. The | 9 * utility functions for writing to the StreamConsumer directly. The |
10 * [IOSink] buffers the input given by [write], [writeAll], [writeln], | 10 * [IOSink] buffers the input given by [write], [writeAll], [writeln], |
Søren Gjesse
2013/04/15 06:56:30
[write], [writeAll], [writeln], [writeCharCode] =>
Anders Johnsen
2013/04/15 07:35:20
Done.
| |
11 * [writeCharCode] and [add] and will delay a [consume] or | 11 * [writeCharCode] and [add] and will delay a [addStream] until |
12 * [writeStream] until the buffer is flushed. | 12 * the buffer is flushed. |
13 * | 13 * |
14 * When the [IOSink] is bound to a stream (through either [consume] | 14 * When the [IOSink] is bound to a stream (through [addStream]) any call |
15 * or [writeStream]) any call to the [IOSink] will throw a | 15 * to the [IOSink] will throw a [StateError]. |
Søren Gjesse
2013/04/15 06:56:30
Maybe mention that when the addStream future compl
Anders Johnsen
2013/04/15 07:35:20
Done.
| |
16 * [StateError]. | |
17 */ | 16 */ |
18 abstract class IOSink<T> | 17 abstract class IOSink |
19 implements StreamConsumer<List<int>>, StringSink, EventSink<List<int>> { | 18 implements StreamConsumer<List<int>>, StringSink, EventSink<List<int>> { |
20 factory IOSink(StreamConsumer<List<int>> target, | 19 factory IOSink(StreamConsumer<List<int>> target, |
21 {Encoding encoding: Encoding.UTF_8}) | 20 {Encoding encoding: Encoding.UTF_8}) |
22 => new _IOSinkImpl(target, encoding); | 21 => new _IOSinkImpl(target, encoding); |
23 | 22 |
24 /** | 23 /** |
25 * The [Encoding] used when writing strings. Depending on the | 24 * The [Encoding] used when writing strings. Depending on the |
26 * underlying consumer this property might be mutable. | 25 * underlying consumer this property might be mutable. |
27 */ | 26 */ |
28 Encoding encoding; | 27 Encoding encoding; |
29 | 28 |
30 /** | 29 /** |
31 * Writes the bytes uninterpreted to the consumer. | 30 * Writes the bytes uninterpreted to the consumer. |
32 */ | 31 */ |
33 void add(List<int> data); | 32 void add(List<int> data); |
34 | 33 |
35 /** | 34 /** |
36 * Writes an error to the consumer. | 35 * Writes an error to the consumer. |
37 */ | 36 */ |
38 void addError(AsyncError error); | 37 void addError(AsyncError error); |
39 | 38 |
40 /** | 39 /** |
41 * Provide functionality for piping to the [IOSink]. | |
42 */ | |
43 Future<T> consume(Stream<List<int>> stream); | |
44 | |
45 /** | |
46 * Adds all elements of the given [stream] to `this`. | 40 * Adds all elements of the given [stream] to `this`. |
47 */ | 41 */ |
48 Future<T> addStream(Stream<List<int>> stream); | 42 Future addStream(Stream<List<int>> stream); |
49 | |
50 /** | |
51 * Like [consume], but will not close the target when done. | |
52 * | |
53 * *Deprecated*: use [addStream] instead. | |
54 */ | |
55 Future<T> writeStream(Stream<List<int>> stream); | |
56 | 43 |
57 /** | 44 /** |
58 * Close the target. | 45 * Close the target. |
59 */ | 46 */ |
60 // TODO(floitsch): Currently the future cannot be typed because it has | |
61 // hardcoded type Future<HttpClientResponse> in subclass HttpClientRequest. | |
62 Future close(); | 47 Future close(); |
63 | 48 |
64 /** | 49 /** |
65 * Get future that will complete when all data has been written to | 50 * Get a future that will complete when all synchronous have completed, or an |
66 * the IOSink and it has been closed. | 51 * error happened. This future is identical to the future returned from close. |
67 */ | 52 */ |
68 Future<T> get done; | 53 Future get done; |
69 } | 54 } |
70 | 55 |
71 | 56 |
72 class _IOSinkImpl<T> implements IOSink<T> { | 57 class _IOSinkImpl implements IOSink { |
73 final StreamConsumer<List<int>> _target; | 58 final StreamConsumer<List<int>> _target; |
74 | 59 Completer _doneCompleter = new Completer(); |
75 Completer _writeStreamCompleter; | 60 Future _doneFuture; |
76 StreamController<List<int>> _controllerInstance; | 61 StreamController<List<int>> _controllerInstance; |
77 Future<T> _pipeFuture; | 62 Completer _controllerCompleter; |
78 StreamSubscription<List<int>> _bindSubscription; | 63 Encoding _encoding; |
79 bool _paused = true; | 64 bool _isClosed = false; |
65 bool _isBound = false; | |
80 bool _encodingMutable = true; | 66 bool _encodingMutable = true; |
81 | 67 |
82 _IOSinkImpl(StreamConsumer<List<int>> this._target, this._encoding); | 68 _IOSinkImpl(StreamConsumer<List<int>> this._target, this._encoding) { |
83 | 69 _doneFuture = _doneCompleter.future; |
84 Encoding _encoding; | 70 } |
85 | 71 |
86 Encoding get encoding => _encoding; | 72 Encoding get encoding => _encoding; |
87 | 73 |
88 void set encoding(Encoding value) { | 74 void set encoding(Encoding value) { |
89 if (!_encodingMutable) { | 75 if (!_encodingMutable) { |
90 throw new StateError("IOSink encoding is not mutable"); | 76 throw new StateError("IOSink encoding is not mutable"); |
91 } | 77 } |
92 _encoding = value; | 78 _encoding = value; |
93 } | 79 } |
94 | 80 |
(...skipping 33 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... | |
128 void writeln([Object obj = ""]) { | 114 void writeln([Object obj = ""]) { |
129 write(obj); | 115 write(obj); |
130 write("\n"); | 116 write("\n"); |
131 } | 117 } |
132 | 118 |
133 void writeCharCode(int charCode) { | 119 void writeCharCode(int charCode) { |
134 write(new String.fromCharCode(charCode)); | 120 write(new String.fromCharCode(charCode)); |
135 } | 121 } |
136 | 122 |
137 void add(List<int> data) { | 123 void add(List<int> data) { |
138 if (_isBound) { | |
139 throw new StateError("IOSink is already bound to a stream"); | |
140 } | |
141 _controller.add(data); | 124 _controller.add(data); |
142 } | 125 } |
143 | 126 |
144 void addError(AsyncError error) { | 127 void addError(AsyncError error) { |
128 _controller.addError(error); | |
129 } | |
130 | |
131 Future addStream(Stream<List<int>> stream) { | |
145 if (_isBound) { | 132 if (_isBound) { |
146 throw new StateError("IOSink is already bound to a stream"); | 133 throw new StateError("IOSink is already bound to a stream"); |
147 } | 134 } |
148 _controller.addError(error); | 135 _isBound = true; |
Søren Gjesse
2013/04/15 06:56:30
This looks good. Maybe add a comment like "wait fo
Anders Johnsen
2013/04/15 07:35:20
Done.
| |
136 return _closeController().then((_) { | |
137 return _target.addStream(stream) | |
138 .whenComplete(() { | |
139 _isBound = false; | |
140 }); | |
141 }); | |
149 } | 142 } |
150 | 143 |
151 Future<T> consume(Stream<List<int>> stream) { | 144 Future _closeController() { |
152 if (_isBound) { | 145 if (_controllerInstance == null) return new Future.immediate(null); |
153 throw new StateError("IOSink is already bound to a stream"); | 146 var future = _controllerCompleter.future; |
154 } | 147 _controllerInstance.close(); |
155 return _fillFromStream(stream); | 148 return future; |
156 } | |
157 | |
158 Future<T> writeStream(Stream<List<int>> stream) { | |
159 return addStream(stream); | |
160 } | |
161 | |
162 Future<T> addStream(Stream<List<int>> stream) { | |
163 if (_isBound) { | |
164 throw new StateError("IOSink is already bound to a stream"); | |
165 } | |
166 return _fillFromStream(stream, unbind: true); | |
167 } | 149 } |
168 | 150 |
169 Future close() { | 151 Future close() { |
170 if (_isBound) { | 152 if (_isBound) { |
171 throw new StateError("IOSink is already bound to a stream"); | 153 throw new StateError("IOSink is already bound to a stream"); |
Søren Gjesse
2013/04/15 06:56:30
Maybe the "already" in this message is confusing.
Anders Johnsen
2013/04/15 07:35:20
Done.
| |
172 } | 154 } |
173 _controller.close(); | 155 if (!_isClosed) { |
174 return _pipeFuture; | 156 _isClosed = true; |
157 if (_controllerInstance != null) { | |
158 _controllerInstance.close(); | |
159 } else { | |
160 _closeTarget(); | |
161 } | |
162 } | |
163 return done; | |
175 } | 164 } |
176 | 165 |
177 Future<T> get done { | 166 void _closeTarget() { |
178 _controller; | 167 _target.close() |
179 return _pipeFuture; | 168 .then((_) => _completeDone(), |
169 onError: (error) => _completeDone(error)); | |
180 } | 170 } |
181 | 171 |
182 void _completeWriteStreamCompleter([error]) { | 172 Future get done => _doneFuture; |
183 if (_writeStreamCompleter == null) return; | 173 |
184 var tmp = _writeStreamCompleter; | 174 void _completeDone([error]) { |
185 _writeStreamCompleter = null; | 175 if (_doneCompleter == null) return; |
176 var tmp = _doneCompleter; | |
177 _doneCompleter = null; | |
186 if (error == null) { | 178 if (error == null) { |
187 _bindSubscription = null; | |
188 tmp.complete(); | 179 tmp.complete(); |
189 } else { | 180 } else { |
190 tmp.completeError(error); | 181 tmp.completeError(error); |
191 } | 182 } |
192 } | 183 } |
193 | 184 |
194 StreamController<List<int>> get _controller { | 185 StreamController<List<int>> get _controller { |
186 if (_isBound) { | |
187 throw new StateError("IOSink is already bound to a stream"); | |
Søren Gjesse
2013/04/15 06:56:30
Remove "already" here as well.
Anders Johnsen
2013/04/15 07:35:20
Done.
| |
188 } | |
189 if (_isClosed) { | |
190 throw new StateError("IOSink is already closed"); | |
Søren Gjesse
2013/04/15 06:56:30
And here.
Anders Johnsen
2013/04/15 07:35:20
Done.
| |
191 } | |
195 if (_controllerInstance == null) { | 192 if (_controllerInstance == null) { |
Søren Gjesse
2013/04/15 06:56:30
This also looks nice!
Anders Johnsen
2013/04/15 07:35:20
Thanks :)
| |
196 _controllerInstance = new StreamController<List<int>>( | 193 _controllerInstance = new StreamController<List<int>>(); |
197 onPauseStateChange: _onPauseStateChange, | 194 _controllerCompleter = new Completer(); |
198 onSubscriptionStateChange: _onSubscriptionStateChange); | 195 _target.addStream(_controller.stream) |
199 var future = _controller.stream.pipe(_target); | 196 .then( |
200 future.then((_) => _completeWriteStreamCompleter(), | 197 (_) { |
201 onError: (error) => _completeWriteStreamCompleter(error)); | 198 if (_isBound) { |
202 _pipeFuture = future.then((value) => value); | 199 // A new stream takes over - forward errors to that stream. |
Søren Gjesse
2013/04/15 06:56:30
I think this comment belongs in the onError case.
Anders Johnsen
2013/04/15 07:35:20
Done.
| |
200 var completer = _controllerCompleter; | |
201 _controllerCompleter = null; | |
202 _controllerInstance = null; | |
203 completer.complete(); | |
204 } else { | |
205 // No new stream, .close was called. Close _target. | |
206 _closeTarget(); | |
207 } | |
208 }, | |
209 onError: (error) { | |
210 if (_isBound) { | |
211 var completer = _controllerCompleter; | |
212 _controllerCompleter = null; | |
213 _controllerInstance = null; | |
214 completer.completeError(error); | |
215 } else { | |
216 _completeDone(error); | |
217 } | |
218 }); | |
203 } | 219 } |
204 return _controllerInstance; | 220 return _controllerInstance; |
205 } | 221 } |
206 | |
207 bool get _isBound => _bindSubscription != null; | |
208 | |
209 void _onPauseStateChange() { | |
210 _paused = _controller.isPaused; | |
211 if (_controller.isPaused) { | |
212 _pause(); | |
213 } else { | |
214 _resume(); | |
215 } | |
216 } | |
217 | |
218 void _pause() { | |
219 if (_bindSubscription != null) { | |
220 try { | |
221 // The subscription can be canceled at this point. | |
222 _bindSubscription.pause(); | |
223 } catch (e) { | |
224 } | |
225 } | |
226 } | |
227 | |
228 void _resume() { | |
229 if (_bindSubscription != null) { | |
230 try { | |
231 // The subscription can be canceled at this point. | |
232 _bindSubscription.resume(); | |
233 } catch (e) { | |
234 } | |
235 } | |
236 } | |
237 | |
238 void _onSubscriptionStateChange() { | |
239 if (_controller.hasListener) { | |
240 _paused = false; | |
241 _resume(); | |
242 } else { | |
243 if (_bindSubscription != null) { | |
244 _bindSubscription.cancel(); | |
245 _bindSubscription = null; | |
246 } | |
247 } | |
248 } | |
249 | |
250 Future<T> _fillFromStream(Stream<List<int>> stream, {unbind: false}) { | |
251 _controller; | |
252 assert(_writeStreamCompleter == null); | |
253 if (unbind) { | |
254 _writeStreamCompleter = new Completer<T>(); | |
255 } | |
256 _bindSubscription = stream.listen( | |
257 _controller.add, | |
258 onDone: () { | |
259 if (unbind) { | |
260 _completeWriteStreamCompleter(); | |
261 } else { | |
262 _controller.close(); | |
263 } | |
264 }, | |
265 onError: _controller.addError); | |
266 if (_paused) _pause(); | |
267 if (unbind) { | |
268 return _writeStreamCompleter.future; | |
269 } else { | |
270 return _pipeFuture; | |
271 } | |
272 } | |
273 } | 222 } |
OLD | NEW |