Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(197)

Side by Side Diff: sdk/lib/io/io_sink.dart

Issue 1517173002: Rewrite IOSink documentation, plus some clean-up. (Closed) Base URL: https://github.com/dart-lang/sdk.git@master
Patch Set: Created 5 years ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch
« no previous file with comments | « no previous file | no next file » | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
1 // Copyright (c) 2013, the Dart project authors. Please see the AUTHORS file 1 // Copyright (c) 2013, the Dart project authors. Please see the AUTHORS file
2 // for details. All rights reserved. Use of this source code is governed by a 2 // for details. All rights reserved. Use of this source code is governed by a
3 // BSD-style license that can be found in the LICENSE file. 3 // BSD-style license that can be found in the LICENSE file.
4 4
5 part of dart.io; 5 part of dart.io;
6 6
7 /** 7 /**
8 * Helper class to wrap a [StreamConsumer<List<int>>] and provide 8 * A combined byte and text output.
9 * utility functions for writing to the StreamConsumer directly. The
10 * [IOSink] buffers the input given by all [StringSink] methods and will delay
11 * an [addStream] until the buffer is flushed.
12 * 9 *
13 * When the [IOSink] is bound to a stream (through [addStream]) any call 10 * An [IOSink] combines a [StreamSink] of bytes with a [StringSink],
14 * to the [IOSink] will throw a [StateError]. When the [addStream] completes, 11 * and allows easy output of both bytes and text.
15 * the [IOSink] will again be open for all calls. 12 *
13 * Writing text ([write]) and adding bytes ([add]) may be interleaved freely.
14 *
15 * While a stream is being added using [addStream], any further attempts
16 * to add or write to the [IOSink] will fail until the [addStream] completes.
16 * 17 *
17 * If data is added to the [IOSink] after the sink is closed, the data will be 18 * If data is added to the [IOSink] after the sink is closed, the data will be
18 * ignored. Use the [done] future to be notified when the [IOSink] is closed. 19 * ignored. Use the [done] future to be notified when the [IOSink] is closed.
19 */ 20 */
20 abstract class IOSink implements StreamSink<List<int>>, StringSink { 21 abstract class IOSink implements StreamSink<List<int>>, StringSink {
21 // TODO(ajohnsen): Make _encodingMutable an argument. 22
23 /**
24 * Create an [IOSink] that outputs to a [target] [StreamConsumer] of bytes.
25 *
26 * Text written to [StreamSink] methods is encoded to bytes using [encoding]
27 * before being output on [target].
28 */
22 factory IOSink(StreamConsumer<List<int>> target, 29 factory IOSink(StreamConsumer<List<int>> target,
23 {Encoding encoding: UTF8}) 30 {Encoding encoding: UTF8})
24 => new _IOSinkImpl(target, encoding); 31 => new _IOSinkImpl(target, encoding);
25 32
26 /** 33 /**
27 * The [Encoding] used when writing strings. Depending on the 34 * The [Encoding] used when writing strings. Depending on the
28 * underlying consumer this property might be mutable. 35 * underlying consumer this property might be mutable.
29 */ 36 */
30 Encoding encoding; 37 Encoding encoding;
31 38
32 /** 39 /**
33 * Adds [data] to the target consumer, ignoring [encoding]. 40 * Adds byte [data] to the target consumer, ignoring [encoding].
34 * 41 *
35 * The [encoding] does not apply to this method, and the `data` list is passed 42 * The [encoding] does not apply to this method, and the `data` list is passed
36 * directly to the target consumer as a stream event. 43 * directly to the target consumer as a stream event.
37 * 44 *
38 * This function must not be called when a stream is currently being added 45 * This function must not be called when a stream is currently being added
39 * using [addStream]. 46 * using [addStream].
40 * 47 *
41 * This operation is non-blocking. See [flush] or [done] for how to get any 48 * This operation is non-blocking. See [flush] or [done] for how to get any
42 * errors generated by this call. 49 * errors generated by this call.
43 * 50 *
44 * The data list should not be modified after it has been passed to `add`. 51 * The data list should not be modified after it has been passed to `add`.
45 */ 52 */
46 void add(List<int> data); 53 void add(List<int> data);
47 54
48 /** 55 /**
49 * Converts [obj] to a String by invoking [Object.toString] and 56 * Converts [obj] to a String by invoking [Object.toString] and
50 * [add]s the encoding of the result to the target consumer. 57 * [add]s the encoding of the result to the target consumer.
51 * 58 *
52 * This operation is non-blocking. See [flush] or [done] for how to get any 59 * This operation is non-blocking. See [flush] or [done] for how to get any
53 * errors generated by this call. 60 * errors generated by this call.
54 */ 61 */
55 void write(Object obj); 62 void write(Object obj);
56 63
57 /** 64 /**
58 * Iterates over the given [objects] and [write]s them in sequence. 65 * Iterates over the given [objects] and [write]s them in sequence.
59 * 66 *
60 * If [separator] is provided, a `write` with the `separator` is performed 67 * If [separator] is provided, a `write` with the `separator` is performed
61 * between any two elements of `objects`. 68 * between any two elements of objects`.
62 * 69 *
63 * This operation is non-blocking. See [flush] or [done] for how to get any 70 * This operation is non-blocking. See [flush] or [done] for how to get any
64 * errors generated by this call. 71 * errors generated by this call.
65 */ 72 */
66 void writeAll(Iterable objects, [String separator = ""]); 73 void writeAll(Iterable objects, [String separator = ""]);
67 74
68 /** 75 /**
69 * Converts [obj] to a String by invoking [Object.toString] and 76 * Converts [obj] to a String by invoking [Object.toString] and
70 * writes the result to `this`, followed by a newline. 77 * writes the result to `this`, followed by a newline.
71 * 78 *
72 * This operation is non-blocking. See [flush] or [done] for how to get any 79 * This operation is non-blocking. See [flush] or [done] for how to get any
73 * errors generated by this call. 80 * errors generated by this call.
74 */ 81 */
75 void writeln([Object obj = ""]); 82 void writeln([Object obj = ""]);
76 83
77 /** 84 /**
78 * Writes the [charCode] to `this`. 85 * Writes the character of [charCode].
79 * 86 *
80 * This method is equivalent to `write(new String.fromCharCode(charCode))`. 87 * This method is equivalent to `write(new String.fromCharCode(charCode))`.
81 * 88 *
82 * This operation is non-blocking. See [flush] or [done] for how to get any 89 * This operation is non-blocking. See [flush] or [done] for how to get any
83 * errors generated by this call. 90 * errors generated by this call.
84 */ 91 */
85 void writeCharCode(int charCode); 92 void writeCharCode(int charCode);
86 93
87 /** 94 /**
88 * Passes the error to the target consumer as an error event. 95 * Passes the error to the target consumer as an error event.
(...skipping 11 matching lines...) Expand all
100 * 107 *
101 * Returns a [Future] that completes when 108 * Returns a [Future] that completes when
102 * all elements of the given [stream] are added to `this`. 109 * all elements of the given [stream] are added to `this`.
103 */ 110 */
104 Future addStream(Stream<List<int>> stream); 111 Future addStream(Stream<List<int>> stream);
105 112
106 /** 113 /**
107 * Returns a [Future] that completes once all buffered data is accepted by the 114 * Returns a [Future] that completes once all buffered data is accepted by the
108 * to underlying [StreamConsumer]. 115 * to underlying [StreamConsumer].
109 * 116 *
110 * It's an error to call this method, while an [addStream] is incomplete. 117 * This method must not be called while an [addStream] is incomplete.
111 * 118 *
112 * NOTE: This is not necessarily the same as the data being flushed by the 119 * NOTE: This is not necessarily the same as the data being flushed by the
113 * operating system. 120 * operating system.
114 */ 121 */
115 Future flush(); 122 Future flush();
116 123
117 /** 124 /**
118 * Close the target consumer. 125 * Close the target consumer.
119 */ 126 */
120 Future close(); 127 Future close();
121 128
122 /** 129 /**
123 * Get a future that will complete when the consumer closes, or when an 130 * Get a future that will complete when the consumer closes, or when an
124 * error occurs. This future is identical to the future returned by 131 * error occurs. This future is identical to the future returned by
125 * [close]. 132 * [close].
126 */ 133 */
127 Future get done; 134 Future get done;
128 } 135 }
129 136
130 class _StreamSinkImpl<T> implements StreamSink<T> { 137 class _StreamSinkImpl<T> implements StreamSink<T> {
131 final StreamConsumer<T> _target; 138 final StreamConsumer<T> _target;
132 Completer _doneCompleter = new Completer(); 139 final Completer _doneCompleter = new Completer();
133 Future _doneFuture;
134 StreamController<T> _controllerInstance; 140 StreamController<T> _controllerInstance;
135 Completer _controllerCompleter; 141 Completer _controllerCompleter;
136 bool _isClosed = false; 142 bool _isClosed = false;
137 bool _isBound = false; 143 bool _isBound = false;
138 bool _hasError = false; 144 bool _hasError = false;
139 145
140 _StreamSinkImpl(this._target) { 146 _StreamSinkImpl(this._target);
141 _doneFuture = _doneCompleter.future;
142 }
143 147
144 void add(T data) { 148 void add(T data) {
145 if (_isClosed) return; 149 if (_isClosed) return;
146 _controller.add(data); 150 _controller.add(data);
147 } 151 }
148 152
149 void addError(error, [StackTrace stackTrace]) { 153 void addError(error, [StackTrace stackTrace]) {
150 _controller.addError(error, stackTrace); 154 _controller.addError(error, stackTrace);
151 } 155 }
152 156
(...skipping 20 matching lines...) Expand all
173 if (_isBound) { 177 if (_isBound) {
174 throw new StateError("StreamSink is bound to a stream"); 178 throw new StateError("StreamSink is bound to a stream");
175 } 179 }
176 if (_controllerInstance == null) return new Future.value(this); 180 if (_controllerInstance == null) return new Future.value(this);
177 // Adding an empty stream-controller will return a future that will complete 181 // Adding an empty stream-controller will return a future that will complete
178 // when all data is done. 182 // when all data is done.
179 _isBound = true; 183 _isBound = true;
180 var future = _controllerCompleter.future; 184 var future = _controllerCompleter.future;
181 _controllerInstance.close(); 185 _controllerInstance.close();
182 return future.whenComplete(() { 186 return future.whenComplete(() {
183 _isBound = false; 187 _isBound = false;
184 }); 188 });
185 } 189 }
186 190
187 Future close() { 191 Future close() {
188 if (_isBound) { 192 if (_isBound) {
189 throw new StateError("StreamSink is bound to a stream"); 193 throw new StateError("StreamSink is bound to a stream");
190 } 194 }
191 if (!_isClosed) { 195 if (!_isClosed) {
192 _isClosed = true; 196 _isClosed = true;
193 if (_controllerInstance != null) { 197 if (_controllerInstance != null) {
194 _controllerInstance.close(); 198 _controllerInstance.close();
195 } else { 199 } else {
196 _closeTarget(); 200 _closeTarget();
197 } 201 }
198 } 202 }
199 return done; 203 return done;
200 } 204 }
201 205
202 void _closeTarget() { 206 void _closeTarget() {
203 _target.close().then(_completeDoneValue, onError: _completeDoneError); 207 _target.close().then(_completeDoneValue, onError: _completeDoneError);
204 } 208 }
205 209
206 Future get done => _doneFuture; 210 Future get done => _doneCompleter.future;
207 211
208 void _completeDoneValue(value) { 212 void _completeDoneValue(value) {
209 if (_doneCompleter == null) return; 213 if (!_doneCompleter.isCompleted) {
210 _doneCompleter.complete(value); 214 _doneCompleter.complete(value);
211 _doneCompleter = null; 215 }
212 } 216 }
213 217
214 void _completeDoneError(error, StackTrace stackTrace) { 218 void _completeDoneError(error, StackTrace stackTrace) {
215 if (_doneCompleter == null) return; 219 if (!_doneCompleter.isCompleted) {
216 _hasError = true; 220 _hasError = true;
217 _doneCompleter.completeError(error, stackTrace); 221 _doneCompleter.completeError(error, stackTrace);
218 _doneCompleter = null; 222 }
219 } 223 }
220 224
221 StreamController<T> get _controller { 225 StreamController<T> get _controller {
222 if (_isBound) { 226 if (_isBound) {
223 throw new StateError("StreamSink is bound to a stream"); 227 throw new StateError("StreamSink is bound to a stream");
224 } 228 }
225 if (_isClosed) { 229 if (_isClosed) {
226 throw new StateError("StreamSink is closed"); 230 throw new StateError("StreamSink is closed");
227 } 231 }
228 if (_controllerInstance == null) { 232 if (_controllerInstance == null) {
229 _controllerInstance = new StreamController<T>(sync: true); 233 _controllerInstance = new StreamController<T>(sync: true);
230 _controllerCompleter = new Completer(); 234 _controllerCompleter = new Completer();
231 _target.addStream(_controller.stream) 235 _target.addStream(_controller.stream).then((_) {
232 .then( 236 if (_isBound) {
233 (_) { 237 // A new stream takes over - forward values to that stream.
234 if (_isBound) { 238 _controllerCompleter.complete(this);
235 // A new stream takes over - forward values to that stream. 239 _controllerCompleter = null;
236 _controllerCompleter.complete(this); 240 _controllerInstance = null;
237 _controllerCompleter = null; 241 } else {
238 _controllerInstance = null; 242 // No new stream, .close was called. Close _target.
239 } else { 243 _closeTarget();
240 // No new stream, .close was called. Close _target. 244 }
241 _closeTarget(); 245 }, onError: (error, stackTrace) {
242 } 246 if (_isBound) {
243 }, 247 // A new stream takes over - forward errors to that stream.
244 onError: (error, stackTrace) { 248 _controllerCompleter.completeError(error, stackTrace);
245 if (_isBound) { 249 _controllerCompleter = null;
246 // A new stream takes over - forward errors to that stream. 250 _controllerInstance = null;
247 _controllerCompleter.completeError(error, stackTrace); 251 } else {
248 _controllerCompleter = null; 252 // No new stream. No need to close target, as it has already
249 _controllerInstance = null; 253 // failed.
250 } else { 254 _completeDoneError(error, stackTrace);
251 // No new stream. No need to close target, as it have already 255 }
252 // failed. 256 });
253 _completeDoneError(error, stackTrace); 257 }
254 }
255 });
256 }
257 return _controllerInstance; 258 return _controllerInstance;
258 } 259 }
259 } 260 }
260 261
261 262
262 class _IOSinkImpl extends _StreamSinkImpl<List<int>> implements IOSink { 263 class _IOSinkImpl extends _StreamSinkImpl<List<int>> implements IOSink {
263 Encoding _encoding; 264 Encoding _encoding;
264 bool _encodingMutable = true; 265 bool _encodingMutable = true;
265 266
266 _IOSinkImpl(StreamConsumer<List<int>> target, this._encoding) 267 _IOSinkImpl(StreamConsumer<List<int>> target, this._encoding)
(...skipping 23 matching lines...) Expand all
290 } while (iterator.moveNext()); 291 } while (iterator.moveNext());
291 } else { 292 } else {
292 write(iterator.current); 293 write(iterator.current);
293 while (iterator.moveNext()) { 294 while (iterator.moveNext()) {
294 write(separator); 295 write(separator);
295 write(iterator.current); 296 write(iterator.current);
296 } 297 }
297 } 298 }
298 } 299 }
299 300
300 void writeln([Object obj = ""]) { 301 void writeln([Object object = ""]) {
301 write(obj); 302 write(object);
302 write("\n"); 303 write("\n");
303 } 304 }
304 305
305 void writeCharCode(int charCode) { 306 void writeCharCode(int charCode) {
306 write(new String.fromCharCode(charCode)); 307 write(new String.fromCharCode(charCode));
307 } 308 }
308 } 309 }
OLDNEW
« no previous file with comments | « no previous file | no next file » | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698