OLD | NEW |
1 // Copyright (c) 2013, the Dart project authors. Please see the AUTHORS file | 1 // Copyright (c) 2013, the Dart project authors. Please see the AUTHORS file |
2 // for details. All rights reserved. Use of this source code is governed by a | 2 // for details. All rights reserved. Use of this source code is governed by a |
3 // BSD-style license that can be found in the LICENSE file. | 3 // BSD-style license that can be found in the LICENSE file. |
4 | 4 |
5 part of dart.io; | 5 part of dart.io; |
6 | 6 |
7 /** | 7 /** |
8 * Helper class to wrap a [StreamConsumer<List<int>>] and provide | 8 * A combined byte and text output. |
9 * utility functions for writing to the StreamConsumer directly. The | |
10 * [IOSink] buffers the input given by all [StringSink] methods and will delay | |
11 * an [addStream] until the buffer is flushed. | |
12 * | 9 * |
13 * When the [IOSink] is bound to a stream (through [addStream]) any call | 10 * An [IOSink] combines a [StreamSink] of bytes with a [StringSink], |
14 * to the [IOSink] will throw a [StateError]. When the [addStream] completes, | 11 * and allows easy output of both bytes and text. |
15 * the [IOSink] will again be open for all calls. | 12 * |
| 13 * Writing text ([write]) and adding bytes ([add]) may be interleaved freely. |
| 14 * |
| 15 * While a stream is being added using [addStream], any further attempts |
| 16 * to add or write to the [IOSink] will fail until the [addStream] completes. |
16 * | 17 * |
17 * If data is added to the [IOSink] after the sink is closed, the data will be | 18 * If data is added to the [IOSink] after the sink is closed, the data will be |
18 * ignored. Use the [done] future to be notified when the [IOSink] is closed. | 19 * ignored. Use the [done] future to be notified when the [IOSink] is closed. |
19 */ | 20 */ |
20 abstract class IOSink implements StreamSink<List<int>>, StringSink { | 21 abstract class IOSink implements StreamSink<List<int>>, StringSink { |
21 // TODO(ajohnsen): Make _encodingMutable an argument. | 22 |
| 23 /** |
| 24 * Create an [IOSink] that outputs to a [target] [StreamConsumer] of bytes. |
| 25 * |
| 26 * Text written to [StreamSink] methods is encoded to bytes using [encoding] |
| 27 * before being output on [target]. |
| 28 */ |
22 factory IOSink(StreamConsumer<List<int>> target, | 29 factory IOSink(StreamConsumer<List<int>> target, |
23 {Encoding encoding: UTF8}) | 30 {Encoding encoding: UTF8}) |
24 => new _IOSinkImpl(target, encoding); | 31 => new _IOSinkImpl(target, encoding); |
25 | 32 |
26 /** | 33 /** |
27 * The [Encoding] used when writing strings. Depending on the | 34 * The [Encoding] used when writing strings. Depending on the |
28 * underlying consumer this property might be mutable. | 35 * underlying consumer this property might be mutable. |
29 */ | 36 */ |
30 Encoding encoding; | 37 Encoding encoding; |
31 | 38 |
32 /** | 39 /** |
33 * Adds [data] to the target consumer, ignoring [encoding]. | 40 * Adds byte [data] to the target consumer, ignoring [encoding]. |
34 * | 41 * |
35 * The [encoding] does not apply to this method, and the `data` list is passed | 42 * The [encoding] does not apply to this method, and the `data` list is passed |
36 * directly to the target consumer as a stream event. | 43 * directly to the target consumer as a stream event. |
37 * | 44 * |
38 * This function must not be called when a stream is currently being added | 45 * This function must not be called when a stream is currently being added |
39 * using [addStream]. | 46 * using [addStream]. |
40 * | 47 * |
41 * This operation is non-blocking. See [flush] or [done] for how to get any | 48 * This operation is non-blocking. See [flush] or [done] for how to get any |
42 * errors generated by this call. | 49 * errors generated by this call. |
43 * | 50 * |
44 * The data list should not be modified after it has been passed to `add`. | 51 * The data list should not be modified after it has been passed to `add`. |
45 */ | 52 */ |
46 void add(List<int> data); | 53 void add(List<int> data); |
47 | 54 |
48 /** | 55 /** |
49 * Converts [obj] to a String by invoking [Object.toString] and | 56 * Converts [obj] to a String by invoking [Object.toString] and |
50 * [add]s the encoding of the result to the target consumer. | 57 * [add]s the encoding of the result to the target consumer. |
51 * | 58 * |
52 * This operation is non-blocking. See [flush] or [done] for how to get any | 59 * This operation is non-blocking. See [flush] or [done] for how to get any |
53 * errors generated by this call. | 60 * errors generated by this call. |
54 */ | 61 */ |
55 void write(Object obj); | 62 void write(Object obj); |
56 | 63 |
57 /** | 64 /** |
58 * Iterates over the given [objects] and [write]s them in sequence. | 65 * Iterates over the given [objects] and [write]s them in sequence. |
59 * | 66 * |
60 * If [separator] is provided, a `write` with the `separator` is performed | 67 * If [separator] is provided, a `write` with the `separator` is performed |
61 * between any two elements of `objects`. | 68 * between any two elements of objects`. |
62 * | 69 * |
63 * This operation is non-blocking. See [flush] or [done] for how to get any | 70 * This operation is non-blocking. See [flush] or [done] for how to get any |
64 * errors generated by this call. | 71 * errors generated by this call. |
65 */ | 72 */ |
66 void writeAll(Iterable objects, [String separator = ""]); | 73 void writeAll(Iterable objects, [String separator = ""]); |
67 | 74 |
68 /** | 75 /** |
69 * Converts [obj] to a String by invoking [Object.toString] and | 76 * Converts [obj] to a String by invoking [Object.toString] and |
70 * writes the result to `this`, followed by a newline. | 77 * writes the result to `this`, followed by a newline. |
71 * | 78 * |
72 * This operation is non-blocking. See [flush] or [done] for how to get any | 79 * This operation is non-blocking. See [flush] or [done] for how to get any |
73 * errors generated by this call. | 80 * errors generated by this call. |
74 */ | 81 */ |
75 void writeln([Object obj = ""]); | 82 void writeln([Object obj = ""]); |
76 | 83 |
77 /** | 84 /** |
78 * Writes the [charCode] to `this`. | 85 * Writes the character of [charCode]. |
79 * | 86 * |
80 * This method is equivalent to `write(new String.fromCharCode(charCode))`. | 87 * This method is equivalent to `write(new String.fromCharCode(charCode))`. |
81 * | 88 * |
82 * This operation is non-blocking. See [flush] or [done] for how to get any | 89 * This operation is non-blocking. See [flush] or [done] for how to get any |
83 * errors generated by this call. | 90 * errors generated by this call. |
84 */ | 91 */ |
85 void writeCharCode(int charCode); | 92 void writeCharCode(int charCode); |
86 | 93 |
87 /** | 94 /** |
88 * Passes the error to the target consumer as an error event. | 95 * Passes the error to the target consumer as an error event. |
(...skipping 11 matching lines...) Expand all Loading... |
100 * | 107 * |
101 * Returns a [Future] that completes when | 108 * Returns a [Future] that completes when |
102 * all elements of the given [stream] are added to `this`. | 109 * all elements of the given [stream] are added to `this`. |
103 */ | 110 */ |
104 Future addStream(Stream<List<int>> stream); | 111 Future addStream(Stream<List<int>> stream); |
105 | 112 |
106 /** | 113 /** |
107 * Returns a [Future] that completes once all buffered data is accepted by the | 114 * Returns a [Future] that completes once all buffered data is accepted by the |
108 * to underlying [StreamConsumer]. | 115 * to underlying [StreamConsumer]. |
109 * | 116 * |
110 * It's an error to call this method, while an [addStream] is incomplete. | 117 * This method must not be called while an [addStream] is incomplete. |
111 * | 118 * |
112 * NOTE: This is not necessarily the same as the data being flushed by the | 119 * NOTE: This is not necessarily the same as the data being flushed by the |
113 * operating system. | 120 * operating system. |
114 */ | 121 */ |
115 Future flush(); | 122 Future flush(); |
116 | 123 |
117 /** | 124 /** |
118 * Close the target consumer. | 125 * Close the target consumer. |
119 */ | 126 */ |
120 Future close(); | 127 Future close(); |
121 | 128 |
122 /** | 129 /** |
123 * Get a future that will complete when the consumer closes, or when an | 130 * Get a future that will complete when the consumer closes, or when an |
124 * error occurs. This future is identical to the future returned by | 131 * error occurs. This future is identical to the future returned by |
125 * [close]. | 132 * [close]. |
126 */ | 133 */ |
127 Future get done; | 134 Future get done; |
128 } | 135 } |
129 | 136 |
130 class _StreamSinkImpl<T> implements StreamSink<T> { | 137 class _StreamSinkImpl<T> implements StreamSink<T> { |
131 final StreamConsumer<T> _target; | 138 final StreamConsumer<T> _target; |
132 Completer _doneCompleter = new Completer(); | 139 final Completer _doneCompleter = new Completer(); |
133 Future _doneFuture; | |
134 StreamController<T> _controllerInstance; | 140 StreamController<T> _controllerInstance; |
135 Completer _controllerCompleter; | 141 Completer _controllerCompleter; |
136 bool _isClosed = false; | 142 bool _isClosed = false; |
137 bool _isBound = false; | 143 bool _isBound = false; |
138 bool _hasError = false; | 144 bool _hasError = false; |
139 | 145 |
140 _StreamSinkImpl(this._target) { | 146 _StreamSinkImpl(this._target); |
141 _doneFuture = _doneCompleter.future; | |
142 } | |
143 | 147 |
144 void add(T data) { | 148 void add(T data) { |
145 if (_isClosed) return; | 149 if (_isClosed) return; |
146 _controller.add(data); | 150 _controller.add(data); |
147 } | 151 } |
148 | 152 |
149 void addError(error, [StackTrace stackTrace]) { | 153 void addError(error, [StackTrace stackTrace]) { |
150 _controller.addError(error, stackTrace); | 154 _controller.addError(error, stackTrace); |
151 } | 155 } |
152 | 156 |
(...skipping 20 matching lines...) Expand all Loading... |
173 if (_isBound) { | 177 if (_isBound) { |
174 throw new StateError("StreamSink is bound to a stream"); | 178 throw new StateError("StreamSink is bound to a stream"); |
175 } | 179 } |
176 if (_controllerInstance == null) return new Future.value(this); | 180 if (_controllerInstance == null) return new Future.value(this); |
177 // Adding an empty stream-controller will return a future that will complete | 181 // Adding an empty stream-controller will return a future that will complete |
178 // when all data is done. | 182 // when all data is done. |
179 _isBound = true; | 183 _isBound = true; |
180 var future = _controllerCompleter.future; | 184 var future = _controllerCompleter.future; |
181 _controllerInstance.close(); | 185 _controllerInstance.close(); |
182 return future.whenComplete(() { | 186 return future.whenComplete(() { |
183 _isBound = false; | 187 _isBound = false; |
184 }); | 188 }); |
185 } | 189 } |
186 | 190 |
187 Future close() { | 191 Future close() { |
188 if (_isBound) { | 192 if (_isBound) { |
189 throw new StateError("StreamSink is bound to a stream"); | 193 throw new StateError("StreamSink is bound to a stream"); |
190 } | 194 } |
191 if (!_isClosed) { | 195 if (!_isClosed) { |
192 _isClosed = true; | 196 _isClosed = true; |
193 if (_controllerInstance != null) { | 197 if (_controllerInstance != null) { |
194 _controllerInstance.close(); | 198 _controllerInstance.close(); |
195 } else { | 199 } else { |
196 _closeTarget(); | 200 _closeTarget(); |
197 } | 201 } |
198 } | 202 } |
199 return done; | 203 return done; |
200 } | 204 } |
201 | 205 |
202 void _closeTarget() { | 206 void _closeTarget() { |
203 _target.close().then(_completeDoneValue, onError: _completeDoneError); | 207 _target.close().then(_completeDoneValue, onError: _completeDoneError); |
204 } | 208 } |
205 | 209 |
206 Future get done => _doneFuture; | 210 Future get done => _doneCompleter.future; |
207 | 211 |
208 void _completeDoneValue(value) { | 212 void _completeDoneValue(value) { |
209 if (_doneCompleter == null) return; | 213 if (!_doneCompleter.isCompleted) { |
210 _doneCompleter.complete(value); | 214 _doneCompleter.complete(value); |
211 _doneCompleter = null; | 215 } |
212 } | 216 } |
213 | 217 |
214 void _completeDoneError(error, StackTrace stackTrace) { | 218 void _completeDoneError(error, StackTrace stackTrace) { |
215 if (_doneCompleter == null) return; | 219 if (!_doneCompleter.isCompleted) { |
216 _hasError = true; | 220 _hasError = true; |
217 _doneCompleter.completeError(error, stackTrace); | 221 _doneCompleter.completeError(error, stackTrace); |
218 _doneCompleter = null; | 222 } |
219 } | 223 } |
220 | 224 |
221 StreamController<T> get _controller { | 225 StreamController<T> get _controller { |
222 if (_isBound) { | 226 if (_isBound) { |
223 throw new StateError("StreamSink is bound to a stream"); | 227 throw new StateError("StreamSink is bound to a stream"); |
224 } | 228 } |
225 if (_isClosed) { | 229 if (_isClosed) { |
226 throw new StateError("StreamSink is closed"); | 230 throw new StateError("StreamSink is closed"); |
227 } | 231 } |
228 if (_controllerInstance == null) { | 232 if (_controllerInstance == null) { |
229 _controllerInstance = new StreamController<T>(sync: true); | 233 _controllerInstance = new StreamController<T>(sync: true); |
230 _controllerCompleter = new Completer(); | 234 _controllerCompleter = new Completer(); |
231 _target.addStream(_controller.stream) | 235 _target.addStream(_controller.stream).then((_) { |
232 .then( | 236 if (_isBound) { |
233 (_) { | 237 // A new stream takes over - forward values to that stream. |
234 if (_isBound) { | 238 _controllerCompleter.complete(this); |
235 // A new stream takes over - forward values to that stream. | 239 _controllerCompleter = null; |
236 _controllerCompleter.complete(this); | 240 _controllerInstance = null; |
237 _controllerCompleter = null; | 241 } else { |
238 _controllerInstance = null; | 242 // No new stream, .close was called. Close _target. |
239 } else { | 243 _closeTarget(); |
240 // No new stream, .close was called. Close _target. | 244 } |
241 _closeTarget(); | 245 }, onError: (error, stackTrace) { |
242 } | 246 if (_isBound) { |
243 }, | 247 // A new stream takes over - forward errors to that stream. |
244 onError: (error, stackTrace) { | 248 _controllerCompleter.completeError(error, stackTrace); |
245 if (_isBound) { | 249 _controllerCompleter = null; |
246 // A new stream takes over - forward errors to that stream. | 250 _controllerInstance = null; |
247 _controllerCompleter.completeError(error, stackTrace); | 251 } else { |
248 _controllerCompleter = null; | 252 // No new stream. No need to close target, as it has already |
249 _controllerInstance = null; | 253 // failed. |
250 } else { | 254 _completeDoneError(error, stackTrace); |
251 // No new stream. No need to close target, as it have already | 255 } |
252 // failed. | 256 }); |
253 _completeDoneError(error, stackTrace); | 257 } |
254 } | |
255 }); | |
256 } | |
257 return _controllerInstance; | 258 return _controllerInstance; |
258 } | 259 } |
259 } | 260 } |
260 | 261 |
261 | 262 |
262 class _IOSinkImpl extends _StreamSinkImpl<List<int>> implements IOSink { | 263 class _IOSinkImpl extends _StreamSinkImpl<List<int>> implements IOSink { |
263 Encoding _encoding; | 264 Encoding _encoding; |
264 bool _encodingMutable = true; | 265 bool _encodingMutable = true; |
265 | 266 |
266 _IOSinkImpl(StreamConsumer<List<int>> target, this._encoding) | 267 _IOSinkImpl(StreamConsumer<List<int>> target, this._encoding) |
(...skipping 23 matching lines...) Expand all Loading... |
290 } while (iterator.moveNext()); | 291 } while (iterator.moveNext()); |
291 } else { | 292 } else { |
292 write(iterator.current); | 293 write(iterator.current); |
293 while (iterator.moveNext()) { | 294 while (iterator.moveNext()) { |
294 write(separator); | 295 write(separator); |
295 write(iterator.current); | 296 write(iterator.current); |
296 } | 297 } |
297 } | 298 } |
298 } | 299 } |
299 | 300 |
300 void writeln([Object obj = ""]) { | 301 void writeln([Object object = ""]) { |
301 write(obj); | 302 write(object); |
302 write("\n"); | 303 write("\n"); |
303 } | 304 } |
304 | 305 |
305 void writeCharCode(int charCode) { | 306 void writeCharCode(int charCode) { |
306 write(new String.fromCharCode(charCode)); | 307 write(new String.fromCharCode(charCode)); |
307 } | 308 } |
308 } | 309 } |
OLD | NEW |