Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(221)

Side by Side Diff: sdk/lib/io/io_sink.dart

Issue 14028017: Remove .writeStream, .consume and rewrite IOSink to correctly implement a (sane) well-defined behav… (Closed) Base URL: https://dart.googlecode.com/svn/branches/bleeding_edge/dart
Patch Set: Add new test file. Created 7 years, 8 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch | Annotate | Revision Log
OLDNEW
1 // Copyright (c) 2013, the Dart project authors. Please see the AUTHORS file 1 // Copyright (c) 2013, the Dart project authors. Please see the AUTHORS file
2 // for details. All rights reserved. Use of this source code is governed by a 2 // for details. All rights reserved. Use of this source code is governed by a
3 // BSD-style license that can be found in the LICENSE file. 3 // BSD-style license that can be found in the LICENSE file.
4 4
5 part of dart.io; 5 part of dart.io;
6 6
7 /** 7 /**
8 * Helper class to wrap a [StreamConsumer<List<int>>] and provide 8 * Helper class to wrap a [StreamConsumer<List<int>>] and provide
9 * utility functions for writing to the StreamConsumer directly. The 9 * utility functions for writing to the StreamConsumer directly. The
10 * [IOSink] buffers the input given by [write], [writeAll], [writeln], 10 * [IOSink] buffers the input given by [write], [writeAll], [writeln],
Søren Gjesse 2013/04/15 06:56:30 [write], [writeAll], [writeln], [writeCharCode] =>
Anders Johnsen 2013/04/15 07:35:20 Done.
11 * [writeCharCode] and [add] and will delay a [consume] or 11 * [writeCharCode] and [add] and will delay a [addStream] until
12 * [writeStream] until the buffer is flushed. 12 * the buffer is flushed.
13 * 13 *
14 * When the [IOSink] is bound to a stream (through either [consume] 14 * When the [IOSink] is bound to a stream (through [addStream]) any call
15 * or [writeStream]) any call to the [IOSink] will throw a 15 * to the [IOSink] will throw a [StateError].
Søren Gjesse 2013/04/15 06:56:30 Maybe mention that when the addStream future compl
Anders Johnsen 2013/04/15 07:35:20 Done.
16 * [StateError].
17 */ 16 */
18 abstract class IOSink<T> 17 abstract class IOSink
19 implements StreamConsumer<List<int>>, StringSink, EventSink<List<int>> { 18 implements StreamConsumer<List<int>>, StringSink, EventSink<List<int>> {
20 factory IOSink(StreamConsumer<List<int>> target, 19 factory IOSink(StreamConsumer<List<int>> target,
21 {Encoding encoding: Encoding.UTF_8}) 20 {Encoding encoding: Encoding.UTF_8})
22 => new _IOSinkImpl(target, encoding); 21 => new _IOSinkImpl(target, encoding);
23 22
24 /** 23 /**
25 * The [Encoding] used when writing strings. Depending on the 24 * The [Encoding] used when writing strings. Depending on the
26 * underlying consumer this property might be mutable. 25 * underlying consumer this property might be mutable.
27 */ 26 */
28 Encoding encoding; 27 Encoding encoding;
29 28
30 /** 29 /**
31 * Writes the bytes uninterpreted to the consumer. 30 * Writes the bytes uninterpreted to the consumer.
32 */ 31 */
33 void add(List<int> data); 32 void add(List<int> data);
34 33
35 /** 34 /**
36 * Writes an error to the consumer. 35 * Writes an error to the consumer.
37 */ 36 */
38 void addError(AsyncError error); 37 void addError(AsyncError error);
39 38
40 /** 39 /**
41 * Provide functionality for piping to the [IOSink].
42 */
43 Future<T> consume(Stream<List<int>> stream);
44
45 /**
46 * Adds all elements of the given [stream] to `this`. 40 * Adds all elements of the given [stream] to `this`.
47 */ 41 */
48 Future<T> addStream(Stream<List<int>> stream); 42 Future addStream(Stream<List<int>> stream);
49
50 /**
51 * Like [consume], but will not close the target when done.
52 *
53 * *Deprecated*: use [addStream] instead.
54 */
55 Future<T> writeStream(Stream<List<int>> stream);
56 43
57 /** 44 /**
58 * Close the target. 45 * Close the target.
59 */ 46 */
60 // TODO(floitsch): Currently the future cannot be typed because it has
61 // hardcoded type Future<HttpClientResponse> in subclass HttpClientRequest.
62 Future close(); 47 Future close();
63 48
64 /** 49 /**
65 * Get future that will complete when all data has been written to 50 * Get a future that will complete when all synchronous have completed, or an
66 * the IOSink and it has been closed. 51 * error happened. This future is identical to the future returned from close.
67 */ 52 */
68 Future<T> get done; 53 Future get done;
69 } 54 }
70 55
71 56
72 class _IOSinkImpl<T> implements IOSink<T> { 57 class _IOSinkImpl implements IOSink {
73 final StreamConsumer<List<int>> _target; 58 final StreamConsumer<List<int>> _target;
74 59 Completer _doneCompleter = new Completer();
75 Completer _writeStreamCompleter; 60 Future _doneFuture;
76 StreamController<List<int>> _controllerInstance; 61 StreamController<List<int>> _controllerInstance;
77 Future<T> _pipeFuture; 62 Completer _controllerCompleter;
78 StreamSubscription<List<int>> _bindSubscription; 63 Encoding _encoding;
79 bool _paused = true; 64 bool _isClosed = false;
65 bool _isBound = false;
80 bool _encodingMutable = true; 66 bool _encodingMutable = true;
81 67
82 _IOSinkImpl(StreamConsumer<List<int>> this._target, this._encoding); 68 _IOSinkImpl(StreamConsumer<List<int>> this._target, this._encoding) {
83 69 _doneFuture = _doneCompleter.future;
84 Encoding _encoding; 70 }
85 71
86 Encoding get encoding => _encoding; 72 Encoding get encoding => _encoding;
87 73
88 void set encoding(Encoding value) { 74 void set encoding(Encoding value) {
89 if (!_encodingMutable) { 75 if (!_encodingMutable) {
90 throw new StateError("IOSink encoding is not mutable"); 76 throw new StateError("IOSink encoding is not mutable");
91 } 77 }
92 _encoding = value; 78 _encoding = value;
93 } 79 }
94 80
(...skipping 33 matching lines...) Expand 10 before | Expand all | Expand 10 after
128 void writeln([Object obj = ""]) { 114 void writeln([Object obj = ""]) {
129 write(obj); 115 write(obj);
130 write("\n"); 116 write("\n");
131 } 117 }
132 118
133 void writeCharCode(int charCode) { 119 void writeCharCode(int charCode) {
134 write(new String.fromCharCode(charCode)); 120 write(new String.fromCharCode(charCode));
135 } 121 }
136 122
137 void add(List<int> data) { 123 void add(List<int> data) {
138 if (_isBound) {
139 throw new StateError("IOSink is already bound to a stream");
140 }
141 _controller.add(data); 124 _controller.add(data);
142 } 125 }
143 126
144 void addError(AsyncError error) { 127 void addError(AsyncError error) {
128 _controller.addError(error);
129 }
130
131 Future addStream(Stream<List<int>> stream) {
145 if (_isBound) { 132 if (_isBound) {
146 throw new StateError("IOSink is already bound to a stream"); 133 throw new StateError("IOSink is already bound to a stream");
147 } 134 }
148 _controller.addError(error); 135 _isBound = true;
Søren Gjesse 2013/04/15 06:56:30 This looks good. Maybe add a comment like "wait fo
Anders Johnsen 2013/04/15 07:35:20 Done.
136 return _closeController().then((_) {
137 return _target.addStream(stream)
138 .whenComplete(() {
139 _isBound = false;
140 });
141 });
149 } 142 }
150 143
151 Future<T> consume(Stream<List<int>> stream) { 144 Future _closeController() {
152 if (_isBound) { 145 if (_controllerInstance == null) return new Future.immediate(null);
153 throw new StateError("IOSink is already bound to a stream"); 146 var future = _controllerCompleter.future;
154 } 147 _controllerInstance.close();
155 return _fillFromStream(stream); 148 return future;
156 }
157
158 Future<T> writeStream(Stream<List<int>> stream) {
159 return addStream(stream);
160 }
161
162 Future<T> addStream(Stream<List<int>> stream) {
163 if (_isBound) {
164 throw new StateError("IOSink is already bound to a stream");
165 }
166 return _fillFromStream(stream, unbind: true);
167 } 149 }
168 150
169 Future close() { 151 Future close() {
170 if (_isBound) { 152 if (_isBound) {
171 throw new StateError("IOSink is already bound to a stream"); 153 throw new StateError("IOSink is already bound to a stream");
Søren Gjesse 2013/04/15 06:56:30 Maybe the "already" in this message is confusing.
Anders Johnsen 2013/04/15 07:35:20 Done.
172 } 154 }
173 _controller.close(); 155 if (!_isClosed) {
174 return _pipeFuture; 156 _isClosed = true;
157 if (_controllerInstance != null) {
158 _controllerInstance.close();
159 } else {
160 _closeTarget();
161 }
162 }
163 return done;
175 } 164 }
176 165
177 Future<T> get done { 166 void _closeTarget() {
178 _controller; 167 _target.close()
179 return _pipeFuture; 168 .then((_) => _completeDone(),
169 onError: (error) => _completeDone(error));
180 } 170 }
181 171
182 void _completeWriteStreamCompleter([error]) { 172 Future get done => _doneFuture;
183 if (_writeStreamCompleter == null) return; 173
184 var tmp = _writeStreamCompleter; 174 void _completeDone([error]) {
185 _writeStreamCompleter = null; 175 if (_doneCompleter == null) return;
176 var tmp = _doneCompleter;
177 _doneCompleter = null;
186 if (error == null) { 178 if (error == null) {
187 _bindSubscription = null;
188 tmp.complete(); 179 tmp.complete();
189 } else { 180 } else {
190 tmp.completeError(error); 181 tmp.completeError(error);
191 } 182 }
192 } 183 }
193 184
194 StreamController<List<int>> get _controller { 185 StreamController<List<int>> get _controller {
186 if (_isBound) {
187 throw new StateError("IOSink is already bound to a stream");
Søren Gjesse 2013/04/15 06:56:30 Remove "already" here as well.
Anders Johnsen 2013/04/15 07:35:20 Done.
188 }
189 if (_isClosed) {
190 throw new StateError("IOSink is already closed");
Søren Gjesse 2013/04/15 06:56:30 And here.
Anders Johnsen 2013/04/15 07:35:20 Done.
191 }
195 if (_controllerInstance == null) { 192 if (_controllerInstance == null) {
Søren Gjesse 2013/04/15 06:56:30 This also looks nice!
Anders Johnsen 2013/04/15 07:35:20 Thanks :)
196 _controllerInstance = new StreamController<List<int>>( 193 _controllerInstance = new StreamController<List<int>>();
197 onPauseStateChange: _onPauseStateChange, 194 _controllerCompleter = new Completer();
198 onSubscriptionStateChange: _onSubscriptionStateChange); 195 _target.addStream(_controller.stream)
199 var future = _controller.stream.pipe(_target); 196 .then(
200 future.then((_) => _completeWriteStreamCompleter(), 197 (_) {
201 onError: (error) => _completeWriteStreamCompleter(error)); 198 if (_isBound) {
202 _pipeFuture = future.then((value) => value); 199 // A new stream takes over - forward errors to that stream.
Søren Gjesse 2013/04/15 06:56:30 I think this comment belongs in the onError case.
Anders Johnsen 2013/04/15 07:35:20 Done.
200 var completer = _controllerCompleter;
201 _controllerCompleter = null;
202 _controllerInstance = null;
203 completer.complete();
204 } else {
205 // No new stream, .close was called. Close _target.
206 _closeTarget();
207 }
208 },
209 onError: (error) {
210 if (_isBound) {
211 var completer = _controllerCompleter;
212 _controllerCompleter = null;
213 _controllerInstance = null;
214 completer.completeError(error);
215 } else {
216 _completeDone(error);
217 }
218 });
203 } 219 }
204 return _controllerInstance; 220 return _controllerInstance;
205 } 221 }
206
207 bool get _isBound => _bindSubscription != null;
208
209 void _onPauseStateChange() {
210 _paused = _controller.isPaused;
211 if (_controller.isPaused) {
212 _pause();
213 } else {
214 _resume();
215 }
216 }
217
218 void _pause() {
219 if (_bindSubscription != null) {
220 try {
221 // The subscription can be canceled at this point.
222 _bindSubscription.pause();
223 } catch (e) {
224 }
225 }
226 }
227
228 void _resume() {
229 if (_bindSubscription != null) {
230 try {
231 // The subscription can be canceled at this point.
232 _bindSubscription.resume();
233 } catch (e) {
234 }
235 }
236 }
237
238 void _onSubscriptionStateChange() {
239 if (_controller.hasListener) {
240 _paused = false;
241 _resume();
242 } else {
243 if (_bindSubscription != null) {
244 _bindSubscription.cancel();
245 _bindSubscription = null;
246 }
247 }
248 }
249
250 Future<T> _fillFromStream(Stream<List<int>> stream, {unbind: false}) {
251 _controller;
252 assert(_writeStreamCompleter == null);
253 if (unbind) {
254 _writeStreamCompleter = new Completer<T>();
255 }
256 _bindSubscription = stream.listen(
257 _controller.add,
258 onDone: () {
259 if (unbind) {
260 _completeWriteStreamCompleter();
261 } else {
262 _controller.close();
263 }
264 },
265 onError: _controller.addError);
266 if (_paused) _pause();
267 if (unbind) {
268 return _writeStreamCompleter.future;
269 } else {
270 return _pipeFuture;
271 }
272 }
273 } 222 }
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698