OLD | NEW |
| (Empty) |
1 // Copyright (c) 2013, the Dart project authors. Please see the AUTHORS file | |
2 // for details. All rights reserved. Use of this source code is governed by a | |
3 // BSD-style license that can be found in the LICENSE file. | |
4 | |
5 part of dart.io; | |
6 | |
7 /** | |
8 * A combined byte and text output. | |
9 * | |
10 * An [IOSink] combines a [StreamSink] of bytes with a [StringSink], | |
11 * and allows easy output of both bytes and text. | |
12 * | |
13 * Writing text ([write]) and adding bytes ([add]) may be interleaved freely. | |
14 * | |
15 * While a stream is being added using [addStream], any further attempts | |
16 * to add or write to the [IOSink] will fail until the [addStream] completes. | |
17 * | |
18 * If data is added to the [IOSink] after the sink is closed, the data will be | |
19 * ignored. Use the [done] future to be notified when the [IOSink] is closed. | |
20 */ | |
21 abstract class IOSink implements StreamSink<List<int>>, StringSink { | |
22 | |
23 /** | |
24 * Create an [IOSink] that outputs to a [target] [StreamConsumer] of bytes. | |
25 * | |
26 * Text written to [StreamSink] methods is encoded to bytes using [encoding] | |
27 * before being output on [target]. | |
28 */ | |
29 factory IOSink(StreamConsumer<List<int>> target, | |
30 {Encoding encoding: UTF8}) | |
31 => new _IOSinkImpl(target, encoding); | |
32 | |
33 /** | |
34 * The [Encoding] used when writing strings. Depending on the | |
35 * underlying consumer this property might be mutable. | |
36 */ | |
37 Encoding encoding; | |
38 | |
39 /** | |
40 * Adds byte [data] to the target consumer, ignoring [encoding]. | |
41 * | |
42 * The [encoding] does not apply to this method, and the `data` list is passed | |
43 * directly to the target consumer as a stream event. | |
44 * | |
45 * This function must not be called when a stream is currently being added | |
46 * using [addStream]. | |
47 * | |
48 * This operation is non-blocking. See [flush] or [done] for how to get any | |
49 * errors generated by this call. | |
50 * | |
51 * The data list should not be modified after it has been passed to `add`. | |
52 */ | |
53 void add(List<int> data); | |
54 | |
55 /** | |
56 * Converts [obj] to a String by invoking [Object.toString] and | |
57 * [add]s the encoding of the result to the target consumer. | |
58 * | |
59 * This operation is non-blocking. See [flush] or [done] for how to get any | |
60 * errors generated by this call. | |
61 */ | |
62 void write(Object obj); | |
63 | |
64 /** | |
65 * Iterates over the given [objects] and [write]s them in sequence. | |
66 * | |
67 * If [separator] is provided, a `write` with the `separator` is performed | |
68 * between any two elements of objects`. | |
69 * | |
70 * This operation is non-blocking. See [flush] or [done] for how to get any | |
71 * errors generated by this call. | |
72 */ | |
73 void writeAll(Iterable objects, [String separator = ""]); | |
74 | |
75 /** | |
76 * Converts [obj] to a String by invoking [Object.toString] and | |
77 * writes the result to `this`, followed by a newline. | |
78 * | |
79 * This operation is non-blocking. See [flush] or [done] for how to get any | |
80 * errors generated by this call. | |
81 */ | |
82 void writeln([Object obj = ""]); | |
83 | |
84 /** | |
85 * Writes the character of [charCode]. | |
86 * | |
87 * This method is equivalent to `write(new String.fromCharCode(charCode))`. | |
88 * | |
89 * This operation is non-blocking. See [flush] or [done] for how to get any | |
90 * errors generated by this call. | |
91 */ | |
92 void writeCharCode(int charCode); | |
93 | |
94 /** | |
95 * Passes the error to the target consumer as an error event. | |
96 * | |
97 * This function must not be called when a stream is currently being added | |
98 * using [addStream]. | |
99 * | |
100 * This operation is non-blocking. See [flush] or [done] for how to get any | |
101 * errors generated by this call. | |
102 */ | |
103 void addError(error, [StackTrace stackTrace]); | |
104 | |
105 /** | |
106 * Adds all elements of the given [stream] to `this`. | |
107 * | |
108 * Returns a [Future] that completes when | |
109 * all elements of the given [stream] are added to `this`. | |
110 */ | |
111 Future addStream(Stream<List<int>> stream); | |
112 | |
113 /** | |
114 * Returns a [Future] that completes once all buffered data is accepted by the | |
115 * to underlying [StreamConsumer]. | |
116 * | |
117 * This method must not be called while an [addStream] is incomplete. | |
118 * | |
119 * NOTE: This is not necessarily the same as the data being flushed by the | |
120 * operating system. | |
121 */ | |
122 Future flush(); | |
123 | |
124 /** | |
125 * Close the target consumer. | |
126 */ | |
127 Future close(); | |
128 | |
129 /** | |
130 * Get a future that will complete when the consumer closes, or when an | |
131 * error occurs. This future is identical to the future returned by | |
132 * [close]. | |
133 */ | |
134 Future get done; | |
135 } | |
136 | |
137 class _StreamSinkImpl<T> implements StreamSink<T> { | |
138 final StreamConsumer<T> _target; | |
139 final Completer _doneCompleter = new Completer(); | |
140 StreamController<T> _controllerInstance; | |
141 Completer _controllerCompleter; | |
142 bool _isClosed = false; | |
143 bool _isBound = false; | |
144 bool _hasError = false; | |
145 | |
146 _StreamSinkImpl(this._target); | |
147 | |
148 void add(T data) { | |
149 if (_isClosed) return; | |
150 _controller.add(data); | |
151 } | |
152 | |
153 void addError(error, [StackTrace stackTrace]) { | |
154 _controller.addError(error, stackTrace); | |
155 } | |
156 | |
157 Future addStream(Stream<T> stream) { | |
158 if (_isBound) { | |
159 throw new StateError("StreamSink is already bound to a stream"); | |
160 } | |
161 _isBound = true; | |
162 if (_hasError) return done; | |
163 // Wait for any sync operations to complete. | |
164 Future targetAddStream() { | |
165 return _target.addStream(stream) | |
166 .whenComplete(() { | |
167 _isBound = false; | |
168 }); | |
169 } | |
170 if (_controllerInstance == null) return targetAddStream(); | |
171 var future = _controllerCompleter.future; | |
172 _controllerInstance.close(); | |
173 return future.then((_) => targetAddStream()); | |
174 } | |
175 | |
176 Future flush() { | |
177 if (_isBound) { | |
178 throw new StateError("StreamSink is bound to a stream"); | |
179 } | |
180 if (_controllerInstance == null) return new Future.value(this); | |
181 // Adding an empty stream-controller will return a future that will complete | |
182 // when all data is done. | |
183 _isBound = true; | |
184 var future = _controllerCompleter.future; | |
185 _controllerInstance.close(); | |
186 return future.whenComplete(() { | |
187 _isBound = false; | |
188 }); | |
189 } | |
190 | |
191 Future close() { | |
192 if (_isBound) { | |
193 throw new StateError("StreamSink is bound to a stream"); | |
194 } | |
195 if (!_isClosed) { | |
196 _isClosed = true; | |
197 if (_controllerInstance != null) { | |
198 _controllerInstance.close(); | |
199 } else { | |
200 _closeTarget(); | |
201 } | |
202 } | |
203 return done; | |
204 } | |
205 | |
206 void _closeTarget() { | |
207 _target.close().then(_completeDoneValue, onError: _completeDoneError); | |
208 } | |
209 | |
210 Future get done => _doneCompleter.future; | |
211 | |
212 void _completeDoneValue(value) { | |
213 if (!_doneCompleter.isCompleted) { | |
214 _doneCompleter.complete(value); | |
215 } | |
216 } | |
217 | |
218 void _completeDoneError(error, StackTrace stackTrace) { | |
219 if (!_doneCompleter.isCompleted) { | |
220 _hasError = true; | |
221 _doneCompleter.completeError(error, stackTrace); | |
222 } | |
223 } | |
224 | |
225 StreamController<T> get _controller { | |
226 if (_isBound) { | |
227 throw new StateError("StreamSink is bound to a stream"); | |
228 } | |
229 if (_isClosed) { | |
230 throw new StateError("StreamSink is closed"); | |
231 } | |
232 if (_controllerInstance == null) { | |
233 _controllerInstance = new StreamController<T>(sync: true); | |
234 _controllerCompleter = new Completer(); | |
235 _target.addStream(_controller.stream).then((_) { | |
236 if (_isBound) { | |
237 // A new stream takes over - forward values to that stream. | |
238 _controllerCompleter.complete(this); | |
239 _controllerCompleter = null; | |
240 _controllerInstance = null; | |
241 } else { | |
242 // No new stream, .close was called. Close _target. | |
243 _closeTarget(); | |
244 } | |
245 }, onError: (error, stackTrace) { | |
246 if (_isBound) { | |
247 // A new stream takes over - forward errors to that stream. | |
248 _controllerCompleter.completeError(error, stackTrace); | |
249 _controllerCompleter = null; | |
250 _controllerInstance = null; | |
251 } else { | |
252 // No new stream. No need to close target, as it has already | |
253 // failed. | |
254 _completeDoneError(error, stackTrace); | |
255 } | |
256 }); | |
257 } | |
258 return _controllerInstance; | |
259 } | |
260 } | |
261 | |
262 | |
263 class _IOSinkImpl extends _StreamSinkImpl<List<int>> implements IOSink { | |
264 Encoding _encoding; | |
265 bool _encodingMutable = true; | |
266 | |
267 _IOSinkImpl(StreamConsumer<List<int>> target, this._encoding) | |
268 : super(target); | |
269 | |
270 Encoding get encoding => _encoding; | |
271 | |
272 void set encoding(Encoding value) { | |
273 if (!_encodingMutable) { | |
274 throw new StateError("IOSink encoding is not mutable"); | |
275 } | |
276 _encoding = value; | |
277 } | |
278 | |
279 void write(Object obj) { | |
280 String string = '$obj'; | |
281 if (string.isEmpty) return; | |
282 add(_encoding.encode(string)); | |
283 } | |
284 | |
285 void writeAll(Iterable objects, [String separator = ""]) { | |
286 Iterator iterator = objects.iterator; | |
287 if (!iterator.moveNext()) return; | |
288 if (separator.isEmpty) { | |
289 do { | |
290 write(iterator.current); | |
291 } while (iterator.moveNext()); | |
292 } else { | |
293 write(iterator.current); | |
294 while (iterator.moveNext()) { | |
295 write(separator); | |
296 write(iterator.current); | |
297 } | |
298 } | |
299 } | |
300 | |
301 void writeln([Object object = ""]) { | |
302 write(object); | |
303 write("\n"); | |
304 } | |
305 | |
306 void writeCharCode(int charCode) { | |
307 write(new String.fromCharCode(charCode)); | |
308 } | |
309 } | |
OLD | NEW |