Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(277)

Side by Side Diff: sdk/lib/io/io_sink.dart

Issue 196423021: Move _StreamSinkImpl from dart:io to dart:async as StreamSinkAdapter. (Closed) Base URL: https://dart.googlecode.com/svn/branches/bleeding_edge/dart
Patch Set: Created 6 years, 9 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View unified diff | Download patch | Annotate | Revision Log
OLDNEW
1 // Copyright (c) 2013, the Dart project authors. Please see the AUTHORS file 1 // Copyright (c) 2013, the Dart project authors. Please see the AUTHORS file
2 // for details. All rights reserved. Use of this source code is governed by a 2 // for details. All rights reserved. Use of this source code is governed by a
3 // BSD-style license that can be found in the LICENSE file. 3 // BSD-style license that can be found in the LICENSE file.
4 4
5 part of dart.io; 5 part of dart.io;
6 6
7 /** 7 /**
8 * Helper class to wrap a [StreamConsumer<List<int>>] and provide 8 * Helper class to wrap a [StreamConsumer<List<int>>] and provide
9 * utility functions for writing to the StreamConsumer directly. The 9 * utility functions for writing to the StreamConsumer directly. The
10 * [IOSink] buffers the input given by all [StringSink] methods and will delay 10 * [IOSink] buffers the input given by all [StringSink] methods and will delay
11 * an [addStream] until the buffer is flushed. 11 * an [addStream] until the buffer is flushed.
12 * 12 *
13 * When the [IOSink] is bound to a stream (through [addStream]) any call 13 * When the [IOSink] is bound to a stream (through [addStream]) any call
14 * to the [IOSink] will throw a [StateError]. When the [addStream] completes, 14 * to the [IOSink] will throw a [StateError]. When the [addStream] completes,
15 * the [IOSink] will again be open for all calls. 15 * the [IOSink] will again be open to all calls.
Lasse Reichstein Nielsen 2014/03/18 15:34:45 ... will again accept all method calls.
Anders Johnsen 2014/03/19 11:48:55 Done.
16 * 16 *
17 * If data is added to the [IOSink] after the sink is closed, the data will be 17 * If data is added to the [IOSink] after the sink is closed, the data will be
18 * ignored. Use the [done] future to be notified when the [IOSink] is closed. 18 * ignored. Use the [done] future to be notified when the [IOSink] is closed.
19 */ 19 */
20 abstract class IOSink implements StreamSink<List<int>>, StringSink { 20 abstract class IOSink implements StreamSink<List<int>>, StringSink {
21 // TODO(ajohnsen): Make _encodingMutable an argument. 21 // TODO(ajohnsen): Make _encodingMutable an argument.
22 factory IOSink(StreamConsumer<List<int>> target, 22 factory IOSink(StreamConsumer<List<int>> target,
23 {Encoding encoding: UTF8}) 23 {Encoding encoding: UTF8})
24 => new _IOSinkImpl(target, encoding); 24 => new _IOSinkImpl(target, encoding);
25 25
(...skipping 91 matching lines...) Expand 10 before | Expand all | Expand 10 after
117 Future close(); 117 Future close();
118 118
119 /** 119 /**
120 * Get a future that will complete when the consumer closes, or when an 120 * Get a future that will complete when the consumer closes, or when an
121 * error occurs. This future is identical to the future returned by 121 * error occurs. This future is identical to the future returned by
122 * [close]. 122 * [close].
123 */ 123 */
124 Future get done; 124 Future get done;
125 } 125 }
126 126
127 class _StreamSinkImpl<T> implements StreamSink<T> {
128 final StreamConsumer<T> _target;
129 Completer _doneCompleter = new Completer();
130 Future _doneFuture;
131 StreamController<T> _controllerInstance;
132 Completer _controllerCompleter;
133 bool _isClosed = false;
134 bool _isBound = false;
135 bool _hasError = false;
136 127
137 _StreamSinkImpl(this._target) { 128 class _IOSinkImpl extends StreamSinkAdapter<List<int>> implements IOSink {
138 _doneFuture = _doneCompleter.future;
139 }
140
141 void add(T data) {
142 if (_isClosed) return;
143 _controller.add(data);
144 }
145
146 void addError(error, [StackTrace stackTrace]) =>
147 _controller.addError(error, stackTrace);
148
149 Future addStream(Stream<T> stream) {
150 if (_isBound) {
151 throw new StateError("StreamSink is already bound to a stream");
152 }
153 _isBound = true;
154 if (_hasError) return done;
155 // Wait for any sync operations to complete.
156 Future targetAddStream() {
157 return _target.addStream(stream)
158 .whenComplete(() {
159 _isBound = false;
160 });
161 }
162 if (_controllerInstance == null) return targetAddStream();
163 var future = _controllerCompleter.future;
164 _controllerInstance.close();
165 return future.then((_) => targetAddStream());
166 }
167
168 Future flush() {
169 if (_isBound) {
170 throw new StateError("StreamSink is bound to a stream");
171 }
172 if (_controllerInstance == null) return new Future.value(this);
173 // Adding an empty stream-controller will return a future that will complete
174 // when all data is done.
175 _isBound = true;
176 var future = _controllerCompleter.future;
177 _controllerInstance.close();
178 return future.whenComplete(() {
179 _isBound = false;
180 });
181 }
182
183 Future close() {
184 if (_isBound) {
185 throw new StateError("StreamSink is bound to a stream");
186 }
187 if (!_isClosed) {
188 _isClosed = true;
189 if (_controllerInstance != null) {
190 _controllerInstance.close();
191 } else {
192 _closeTarget();
193 }
194 }
195 return done;
196 }
197
198 void _closeTarget() {
199 _target.close()
200 .then((value) => _completeDone(value: value),
201 onError: (error) => _completeDone(error: error));
202 }
203
204 Future get done => _doneFuture;
205
206 void _completeDone({value, error}) {
207 if (_doneCompleter == null) return;
208 if (error == null) {
209 _doneCompleter.complete(value);
210 } else {
211 _hasError = true;
212 _doneCompleter.completeError(error);
213 }
214 _doneCompleter = null;
215 }
216
217 StreamController<T> get _controller {
218 if (_isBound) {
219 throw new StateError("StreamSink is bound to a stream");
220 }
221 if (_isClosed) {
222 throw new StateError("StreamSink is closed");
223 }
224 if (_controllerInstance == null) {
225 _controllerInstance = new StreamController<T>(sync: true);
226 _controllerCompleter = new Completer();
227 _target.addStream(_controller.stream)
228 .then(
229 (_) {
230 if (_isBound) {
231 // A new stream takes over - forward values to that stream.
232 _controllerCompleter.complete(this);
233 _controllerCompleter = null;
234 _controllerInstance = null;
235 } else {
236 // No new stream, .close was called. Close _target.
237 _closeTarget();
238 }
239 },
240 onError: (error) {
241 if (_isBound) {
242 // A new stream takes over - forward errors to that stream.
243 _controllerCompleter.completeError(error);
244 _controllerCompleter = null;
245 _controllerInstance = null;
246 } else {
247 // No new stream. No need to close target, as it have already
248 // failed.
249 _completeDone(error: error);
250 }
251 });
252 }
253 return _controllerInstance;
254 }
255 }
256
257
258 class _IOSinkImpl extends _StreamSinkImpl<List<int>> implements IOSink {
259 Encoding _encoding; 129 Encoding _encoding;
260 bool _encodingMutable = true; 130 bool _encodingMutable = true;
261 131
262 _IOSinkImpl(StreamConsumer<List<int>> target, this._encoding) 132 _IOSinkImpl(StreamConsumer<List<int>> target, this._encoding)
263 : super(target); 133 : super(target);
264 134
265 Encoding get encoding => _encoding; 135 Encoding get encoding => _encoding;
266 136
267 void set encoding(Encoding value) { 137 void set encoding(Encoding value) {
268 if (!_encodingMutable) { 138 if (!_encodingMutable) {
(...skipping 37 matching lines...) Expand 10 before | Expand all | Expand 10 after
306 176
307 void writeln([Object obj = ""]) { 177 void writeln([Object obj = ""]) {
308 write(obj); 178 write(obj);
309 write("\n"); 179 write("\n");
310 } 180 }
311 181
312 void writeCharCode(int charCode) { 182 void writeCharCode(int charCode) {
313 write(new String.fromCharCode(charCode)); 183 write(new String.fromCharCode(charCode));
314 } 184 }
315 } 185 }
OLDNEW

Powered by Google App Engine
This is Rietveld 408576698