OLD | NEW |
1 // Copyright (c) 2013, the Dart project authors. Please see the AUTHORS file | 1 // Copyright (c) 2013, the Dart project authors. Please see the AUTHORS file |
2 // for details. All rights reserved. Use of this source code is governed by a | 2 // for details. All rights reserved. Use of this source code is governed by a |
3 // BSD-style license that can be found in the LICENSE file. | 3 // BSD-style license that can be found in the LICENSE file. |
4 | 4 |
5 part of dart.io; | 5 part of dart.io; |
6 | 6 |
7 /** | 7 /** |
8 * Helper class to wrap a [StreamConsumer<List<int>>] and provide | 8 * Helper class to wrap a [StreamConsumer<List<int>>] and provide |
9 * utility functions for writing to the StreamConsumer directly. The | 9 * utility functions for writing to the StreamConsumer directly. The |
10 * [IOSink] buffers the input given by all [StringSink] methods and will delay | 10 * [IOSink] buffers the input given by all [StringSink] methods and will delay |
(...skipping 92 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
103 _isBound = false; | 103 _isBound = false; |
104 }); | 104 }); |
105 } | 105 } |
106 if (_controllerInstance == null) return targetAddStream(); | 106 if (_controllerInstance == null) return targetAddStream(); |
107 var future = _controllerCompleter.future; | 107 var future = _controllerCompleter.future; |
108 _controllerInstance.close(); | 108 _controllerInstance.close(); |
109 return future.then((_) => targetAddStream()); | 109 return future.then((_) => targetAddStream()); |
110 } | 110 } |
111 | 111 |
112 Future flush() { | 112 Future flush() { |
| 113 if (_isBound) { |
| 114 throw new StateError("StreamSink is bound to a stream"); |
| 115 } |
| 116 if (_controllerInstance == null) return new Future.value(this); |
113 // Adding an empty stream-controller will return a future that will complete | 117 // Adding an empty stream-controller will return a future that will complete |
114 // when all data is done. | 118 // when all data is done. |
115 var controller = new StreamController()..close(); | 119 _isBound = true; |
116 return addStream(controller.stream).then((_) => this); | 120 var future = _controllerCompleter.future; |
| 121 _controllerInstance.close(); |
| 122 return future.whenComplete(() { |
| 123 _isBound = false; |
| 124 }); |
117 } | 125 } |
118 | 126 |
119 Future close() { | 127 Future close() { |
120 if (_isBound) { | 128 if (_isBound) { |
121 throw new StateError("StreamSink is bound to a stream"); | 129 throw new StateError("StreamSink is bound to a stream"); |
122 } | 130 } |
123 if (!_isClosed) { | 131 if (!_isClosed) { |
124 _isClosed = true; | 132 _isClosed = true; |
125 if (_controllerInstance != null) { | 133 if (_controllerInstance != null) { |
126 _controllerInstance.close(); | 134 _controllerInstance.close(); |
(...skipping 31 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
158 throw new StateError("StreamSink is closed"); | 166 throw new StateError("StreamSink is closed"); |
159 } | 167 } |
160 if (_controllerInstance == null) { | 168 if (_controllerInstance == null) { |
161 _controllerInstance = new StreamController<T>(sync: true); | 169 _controllerInstance = new StreamController<T>(sync: true); |
162 _controllerCompleter = new Completer(); | 170 _controllerCompleter = new Completer(); |
163 _target.addStream(_controller.stream) | 171 _target.addStream(_controller.stream) |
164 .then( | 172 .then( |
165 (_) { | 173 (_) { |
166 if (_isBound) { | 174 if (_isBound) { |
167 // A new stream takes over - forward values to that stream. | 175 // A new stream takes over - forward values to that stream. |
168 _controllerCompleter.complete(); | 176 _controllerCompleter.complete(this); |
169 _controllerCompleter = null; | 177 _controllerCompleter = null; |
170 _controllerInstance = null; | 178 _controllerInstance = null; |
171 } else { | 179 } else { |
172 // No new stream, .close was called. Close _target. | 180 // No new stream, .close was called. Close _target. |
173 _closeTarget(); | 181 _closeTarget(); |
174 } | 182 } |
175 }, | 183 }, |
176 onError: (error) { | 184 onError: (error) { |
177 if (_isBound) { | 185 if (_isBound) { |
178 // A new stream takes over - forward errors to that stream. | 186 // A new stream takes over - forward errors to that stream. |
(...skipping 63 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
242 | 250 |
243 void writeln([Object obj = ""]) { | 251 void writeln([Object obj = ""]) { |
244 write(obj); | 252 write(obj); |
245 write("\n"); | 253 write("\n"); |
246 } | 254 } |
247 | 255 |
248 void writeCharCode(int charCode) { | 256 void writeCharCode(int charCode) { |
249 write(new String.fromCharCode(charCode)); | 257 write(new String.fromCharCode(charCode)); |
250 } | 258 } |
251 } | 259 } |
OLD | NEW |