OLD | NEW |
1 // Copyright (c) 2013, the Dart project authors. Please see the AUTHORS file | 1 // Copyright (c) 2013, the Dart project authors. Please see the AUTHORS file |
2 // for details. All rights reserved. Use of this source code is governed by a | 2 // for details. All rights reserved. Use of this source code is governed by a |
3 // BSD-style license that can be found in the LICENSE file. | 3 // BSD-style license that can be found in the LICENSE file. |
4 | 4 |
5 part of dart.io; | 5 part of dart.io; |
6 | 6 |
7 /** | 7 /** |
8 * Helper class to wrap a [StreamConsumer<List<int>, T>] and provide | 8 * Helper class to wrap a [StreamConsumer<List<int>, T>] and provide |
9 * utility functions for writing to the StreamConsumer directly. The | 9 * utility functions for writing to the StreamConsumer directly. The |
10 * [IOSink] buffers the input given by [write], [writeAll], [writeln], | 10 * [IOSink] buffers the input given by [write], [writeAll], [writeln], |
(...skipping 119 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
130 | 130 |
131 void close() { | 131 void close() { |
132 if (_isBound) { | 132 if (_isBound) { |
133 throw new StateError("IOSink is already bound to a stream"); | 133 throw new StateError("IOSink is already bound to a stream"); |
134 } | 134 } |
135 _controller.close(); | 135 _controller.close(); |
136 } | 136 } |
137 | 137 |
138 Future<T> get done { | 138 Future<T> get done { |
139 _controller; | 139 _controller; |
140 return _pipeFuture.then((_) => this); | 140 return _pipeFuture; |
141 } | 141 } |
142 | 142 |
143 void _completeWriteStreamCompleter([error]) { | 143 void _completeWriteStreamCompleter([error]) { |
144 if (_writeStreamCompleter == null) return; | 144 if (_writeStreamCompleter == null) return; |
145 var tmp = _writeStreamCompleter; | 145 var tmp = _writeStreamCompleter; |
146 _writeStreamCompleter = null; | 146 _writeStreamCompleter = null; |
147 if (error == null) { | 147 if (error == null) { |
148 _bindSubscription = null; | 148 _bindSubscription = null; |
149 tmp.complete(); | 149 tmp.complete(); |
150 } else { | 150 } else { |
151 tmp.completeError(error); | 151 tmp.completeError(error); |
152 } | 152 } |
153 } | 153 } |
154 | 154 |
155 StreamController<List<int>> get _controller { | 155 StreamController<List<int>> get _controller { |
156 if (_controllerInstance == null) { | 156 if (_controllerInstance == null) { |
157 _controllerInstance = new StreamController<List<int>>( | 157 _controllerInstance = new StreamController<List<int>>( |
158 onPauseStateChange: _onPauseStateChange, | 158 onPauseStateChange: _onPauseStateChange, |
159 onSubscriptionStateChange: _onSubscriptionStateChange); | 159 onSubscriptionStateChange: _onSubscriptionStateChange); |
160 var future = _controller.stream.pipe(_target); | 160 var future = _controller.stream.pipe(_target); |
161 future.then((_) => _completeWriteStreamCompleter(), | 161 future.then((_) => _completeWriteStreamCompleter(), |
162 onError: (error) => _completeWriteStreamCompleter(error)); | 162 onError: (error) => _completeWriteStreamCompleter(error)); |
163 _pipeFuture = future.then((_) => this); | 163 _pipeFuture = future.then((value) => value); |
164 } | 164 } |
165 return _controllerInstance; | 165 return _controllerInstance; |
166 } | 166 } |
167 | 167 |
168 bool get _isBound => _bindSubscription != null; | 168 bool get _isBound => _bindSubscription != null; |
169 | 169 |
170 void _onPauseStateChange() { | 170 void _onPauseStateChange() { |
171 _paused = _controller.isPaused; | 171 _paused = _controller.isPaused; |
172 if (_controller.isPaused) { | 172 if (_controller.isPaused) { |
173 _pause(); | 173 _pause(); |
(...skipping 47 matching lines...) Expand 10 before | Expand all | Expand 10 after Loading... |
221 _completeWriteStreamCompleter(); | 221 _completeWriteStreamCompleter(); |
222 } else { | 222 } else { |
223 _controller.close(); | 223 _controller.close(); |
224 } | 224 } |
225 }, | 225 }, |
226 onError: _controller.addError); | 226 onError: _controller.addError); |
227 if (_paused) _pause(); | 227 if (_paused) _pause(); |
228 if (unbind) { | 228 if (unbind) { |
229 return _writeStreamCompleter.future; | 229 return _writeStreamCompleter.future; |
230 } else { | 230 } else { |
231 return _pipeFuture.then((_) => this); | 231 return _pipeFuture; |
232 } | 232 } |
233 } | 233 } |
234 } | 234 } |
OLD | NEW |